diff --git a/cluster/docs/ClusterAPI_docs.go b/cluster/docs/ClusterAPI_docs.go index e631c090..56800329 100644 --- a/cluster/docs/ClusterAPI_docs.go +++ b/cluster/docs/ClusterAPI_docs.go @@ -926,12 +926,21 @@ const docTemplateClusterAPI = `{ "type": "object", "properties": { "basic": { + "description": "Passwords for BasicAuth", + "type": "array", + "items": { + "type": "string" + } + }, + "session": { + "description": "Secrets for session JWT", "type": "array", "items": { "type": "string" } }, "token": { + "description": "Tokens/Streamkey for RTMP and SRT", "type": "array", "items": { "type": "string" diff --git a/cluster/docs/ClusterAPI_swagger.json b/cluster/docs/ClusterAPI_swagger.json index c023685b..b7d6a1f3 100644 --- a/cluster/docs/ClusterAPI_swagger.json +++ b/cluster/docs/ClusterAPI_swagger.json @@ -918,12 +918,21 @@ "type": "object", "properties": { "basic": { + "description": "Passwords for BasicAuth", + "type": "array", + "items": { + "type": "string" + } + }, + "session": { + "description": "Secrets for session JWT", "type": "array", "items": { "type": "string" } }, "token": { + "description": "Tokens/Streamkey for RTMP and SRT", "type": "array", "items": { "type": "string" diff --git a/cluster/docs/ClusterAPI_swagger.yaml b/cluster/docs/ClusterAPI_swagger.yaml index 462367b3..d39617a3 100644 --- a/cluster/docs/ClusterAPI_swagger.yaml +++ b/cluster/docs/ClusterAPI_swagger.yaml @@ -186,10 +186,17 @@ definitions: identity.UserAuthServices: properties: basic: + description: Passwords for BasicAuth + items: + type: string + type: array + session: + description: Secrets for session JWT items: type: string type: array token: + description: Tokens/Streamkey for RTMP and SRT items: type: string type: array diff --git a/docs/docs.go b/docs/docs.go index 8367ebce..42902db7 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -3493,6 +3493,77 @@ const docTemplate = `{ } } }, + "/api/v3/session/token/{username}": { + "put": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Request access tokens", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "Request access tokens", + "operationId": "session-3-create-token", + "parameters": [ + { + "type": "string", + "description": "Username", + "name": "username", + "in": "path", + "required": true + }, + { + "description": "Token request", + "name": "config", + "in": "body", + "required": true, + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/api.SessionTokenRequest" + } + } + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/api.SessionTokenRequest" + } + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/api.Error" + } + }, + "403": { + "description": "Forbidden", + "schema": { + "$ref": "#/definitions/api.Error" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/api.Error" + } + } + } + } + }, "/api/v3/skills": { "get": { "security": [ @@ -4084,13 +4155,10 @@ const docTemplate = `{ "description": "ip:port", "type": "string" }, - "bootstrap": { - "type": "boolean" - }, "debug": { "type": "boolean" }, - "emergency_leader_timeout": { + "emergency_leader_timeout_sec": { "description": "seconds", "type": "integer", "format": "int64" @@ -4098,7 +4166,7 @@ const docTemplate = `{ "enable": { "type": "boolean" }, - "node_recover_timeout": { + "node_recover_timeout_sec": { "description": "seconds", "type": "integer", "format": "int64" @@ -4109,10 +4177,7 @@ const docTemplate = `{ "type": "string" } }, - "recover": { - "type": "boolean" - }, - "sync_interval": { + "sync_interval_sec": { "description": "seconds", "type": "integer", "format": "int64" @@ -4842,6 +4907,12 @@ const docTemplate = `{ "type": "string" } }, + "session": { + "type": "array", + "items": { + "type": "string" + } + }, "token": { "type": "array", "items": { @@ -6023,7 +6094,8 @@ const docTemplate = `{ "format": "int64" }, "extra": { - "type": "string" + "type": "object", + "additionalProperties": true }, "id": { "type": "string" @@ -6160,6 +6232,27 @@ const docTemplate = `{ } } }, + "api.SessionTokenRequest": { + "type": "object", + "properties": { + "extra": { + "type": "object", + "additionalProperties": true + }, + "match": { + "type": "string" + }, + "remote": { + "type": "array", + "items": { + "type": "string" + } + }, + "token": { + "type": "string" + } + } + }, "api.SessionsActive": { "type": "object", "additionalProperties": { @@ -6274,13 +6367,10 @@ const docTemplate = `{ "description": "ip:port", "type": "string" }, - "bootstrap": { - "type": "boolean" - }, "debug": { "type": "boolean" }, - "emergency_leader_timeout": { + "emergency_leader_timeout_sec": { "description": "seconds", "type": "integer", "format": "int64" @@ -6288,7 +6378,7 @@ const docTemplate = `{ "enable": { "type": "boolean" }, - "node_recover_timeout": { + "node_recover_timeout_sec": { "description": "seconds", "type": "integer", "format": "int64" @@ -6299,10 +6389,7 @@ const docTemplate = `{ "type": "string" } }, - "recover": { - "type": "boolean" - }, - "sync_interval": { + "sync_interval_sec": { "description": "seconds", "type": "integer", "format": "int64" diff --git a/docs/swagger.json b/docs/swagger.json index 1e89694e..6c692e25 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -3485,6 +3485,77 @@ } } }, + "/api/v3/session/token/{username}": { + "put": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Request access tokens", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "Request access tokens", + "operationId": "session-3-create-token", + "parameters": [ + { + "type": "string", + "description": "Username", + "name": "username", + "in": "path", + "required": true + }, + { + "description": "Token request", + "name": "config", + "in": "body", + "required": true, + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/api.SessionTokenRequest" + } + } + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/api.SessionTokenRequest" + } + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/api.Error" + } + }, + "403": { + "description": "Forbidden", + "schema": { + "$ref": "#/definitions/api.Error" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/api.Error" + } + } + } + } + }, "/api/v3/skills": { "get": { "security": [ @@ -4076,13 +4147,10 @@ "description": "ip:port", "type": "string" }, - "bootstrap": { - "type": "boolean" - }, "debug": { "type": "boolean" }, - "emergency_leader_timeout": { + "emergency_leader_timeout_sec": { "description": "seconds", "type": "integer", "format": "int64" @@ -4090,7 +4158,7 @@ "enable": { "type": "boolean" }, - "node_recover_timeout": { + "node_recover_timeout_sec": { "description": "seconds", "type": "integer", "format": "int64" @@ -4101,10 +4169,7 @@ "type": "string" } }, - "recover": { - "type": "boolean" - }, - "sync_interval": { + "sync_interval_sec": { "description": "seconds", "type": "integer", "format": "int64" @@ -4834,6 +4899,12 @@ "type": "string" } }, + "session": { + "type": "array", + "items": { + "type": "string" + } + }, "token": { "type": "array", "items": { @@ -6015,7 +6086,8 @@ "format": "int64" }, "extra": { - "type": "string" + "type": "object", + "additionalProperties": true }, "id": { "type": "string" @@ -6152,6 +6224,27 @@ } } }, + "api.SessionTokenRequest": { + "type": "object", + "properties": { + "extra": { + "type": "object", + "additionalProperties": true + }, + "match": { + "type": "string" + }, + "remote": { + "type": "array", + "items": { + "type": "string" + } + }, + "token": { + "type": "string" + } + } + }, "api.SessionsActive": { "type": "object", "additionalProperties": { @@ -6266,13 +6359,10 @@ "description": "ip:port", "type": "string" }, - "bootstrap": { - "type": "boolean" - }, "debug": { "type": "boolean" }, - "emergency_leader_timeout": { + "emergency_leader_timeout_sec": { "description": "seconds", "type": "integer", "format": "int64" @@ -6280,7 +6370,7 @@ "enable": { "type": "boolean" }, - "node_recover_timeout": { + "node_recover_timeout_sec": { "description": "seconds", "type": "integer", "format": "int64" @@ -6291,10 +6381,7 @@ "type": "string" } }, - "recover": { - "type": "boolean" - }, - "sync_interval": { + "sync_interval_sec": { "description": "seconds", "type": "integer", "format": "int64" diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 2a664f8d..312690c0 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -274,17 +274,15 @@ definitions: address: description: ip:port type: string - bootstrap: - type: boolean debug: type: boolean - emergency_leader_timeout: + emergency_leader_timeout_sec: description: seconds format: int64 type: integer enable: type: boolean - node_recover_timeout: + node_recover_timeout_sec: description: seconds format: int64 type: integer @@ -292,9 +290,7 @@ definitions: items: type: string type: array - recover: - type: boolean - sync_interval: + sync_interval_sec: description: seconds format: int64 type: integer @@ -782,6 +778,10 @@ definitions: items: type: string type: array + session: + items: + type: string + type: array token: items: type: string @@ -1652,7 +1652,8 @@ definitions: format: int64 type: integer extra: - type: string + additionalProperties: true + type: object id: type: string local: @@ -1746,6 +1747,20 @@ definitions: format: uint64 type: integer type: object + api.SessionTokenRequest: + properties: + extra: + additionalProperties: true + type: object + match: + type: string + remote: + items: + type: string + type: array + token: + type: string + type: object api.SessionsActive: additionalProperties: items: @@ -1820,17 +1835,15 @@ definitions: address: description: ip:port type: string - bootstrap: - type: boolean debug: type: boolean - emergency_leader_timeout: + emergency_leader_timeout_sec: description: seconds format: int64 type: integer enable: type: boolean - node_recover_timeout: + node_recover_timeout_sec: description: seconds format: int64 type: integer @@ -1838,9 +1851,7 @@ definitions: items: type: string type: array - recover: - type: boolean - sync_interval: + sync_interval_sec: description: seconds format: int64 type: integer @@ -4653,6 +4664,52 @@ paths: summary: Get a minimal summary of all active sessions tags: - v16.7.2 + /api/v3/session/token/{username}: + put: + consumes: + - application/json + description: Request access tokens + operationId: session-3-create-token + parameters: + - description: Username + in: path + name: username + required: true + type: string + - description: Token request + in: body + name: config + required: true + schema: + items: + $ref: '#/definitions/api.SessionTokenRequest' + type: array + produces: + - application/json + responses: + "200": + description: OK + schema: + items: + $ref: '#/definitions/api.SessionTokenRequest' + type: array + "400": + description: Bad Request + schema: + $ref: '#/definitions/api.Error' + "403": + description: Forbidden + schema: + $ref: '#/definitions/api.Error' + "404": + description: Not Found + schema: + $ref: '#/definitions/api.Error' + security: + - ApiKeyAuth: [] + summary: Request access tokens + tags: + - v16.?.? /api/v3/skills: get: description: List all detected FFmpeg capabilities. diff --git a/ffmpeg/collector.go b/ffmpeg/collector.go index 9df473f6..6b6ccf0c 100644 --- a/ffmpeg/collector.go +++ b/ffmpeg/collector.go @@ -33,7 +33,7 @@ func (w *wrappedCollector) RegisterAndActivate(id, reference, location, peer str w.Collector.RegisterAndActivate(w.prefix+id, w.reference, location, peer) } -func (w *wrappedCollector) Extra(id, extra string) { +func (w *wrappedCollector) Extra(id string, extra map[string]interface{}) { w.Collector.Extra(w.prefix+id, extra) } diff --git a/http/api/iam.go b/http/api/iam.go index 2f4eccc3..7b87c51d 100644 --- a/http/api/iam.go +++ b/http/api/iam.go @@ -28,8 +28,9 @@ func (u *IAMUser) Marshal(user identity.User, policies []access.Policy) { }, }, Services: IAMUserAuthServices{ - Basic: user.Auth.Services.Basic, - Token: user.Auth.Services.Token, + Basic: user.Auth.Services.Basic, + Token: user.Auth.Services.Token, + Session: user.Auth.Services.Session, }, } @@ -59,8 +60,9 @@ func (u *IAMUser) Unmarshal() (identity.User, []access.Policy) { }, }, Services: identity.UserAuthServices{ - Basic: u.Auth.Services.Basic, - Token: u.Auth.Services.Token, + Basic: u.Auth.Services.Basic, + Token: u.Auth.Services.Token, + Session: u.Auth.Services.Session, }, }, } @@ -95,8 +97,9 @@ type IAMUserAuthAPIAuth0 struct { } type IAMUserAuthServices struct { - Basic []string `json:"basic"` - Token []string `json:"token"` + Basic []string `json:"basic"` + Token []string `json:"token"` + Session []string `json:"session"` } type IAMAuth0Tenant struct { diff --git a/http/api/session.go b/http/api/session.go index d76275c4..cb5ef8d8 100644 --- a/http/api/session.go +++ b/http/api/session.go @@ -22,16 +22,16 @@ type SessionPeers struct { // Session represents an active session type Session struct { - ID string `json:"id"` - Reference string `json:"reference"` - CreatedAt int64 `json:"created_at" format:"int64"` - Location string `json:"local"` - Peer string `json:"remote"` - Extra string `json:"extra"` - RxBytes uint64 `json:"bytes_rx" format:"uint64"` - TxBytes uint64 `json:"bytes_tx" format:"uint64"` - RxBitrate json.Number `json:"bandwidth_rx_kbit" swaggertype:"number" jsonschema:"type=number"` // kbit/s - TxBitrate json.Number `json:"bandwidth_tx_kbit" swaggertype:"number" jsonschema:"type=number"` // kbit/s + ID string `json:"id"` + Reference string `json:"reference"` + CreatedAt int64 `json:"created_at" format:"int64"` + Location string `json:"local"` + Peer string `json:"remote"` + Extra map[string]interface{} `json:"extra"` + RxBytes uint64 `json:"bytes_rx" format:"uint64"` + TxBytes uint64 `json:"bytes_tx" format:"uint64"` + RxBitrate json.Number `json:"bandwidth_rx_kbit" swaggertype:"number" jsonschema:"type=number"` // kbit/s + TxBitrate json.Number `json:"bandwidth_tx_kbit" swaggertype:"number" jsonschema:"type=number"` // kbit/s } func (s *Session) Unmarshal(sess session.Session) { @@ -140,3 +140,10 @@ func (summary *SessionSummary) Unmarshal(sum session.Summary) { summary.Summary.TotalRxBytes = sum.Summary.TotalRxBytes / 1024 / 1024 summary.Summary.TotalTxBytes = sum.Summary.TotalTxBytes / 1024 / 1024 } + +type SessionTokenRequest struct { + Match string `json:"match"` + Remote []string `json:"remote"` + Extra map[string]interface{} `json:"extra"` + Token string `json:"token,omitempty"` +} diff --git a/http/handler/api/restream.go b/http/handler/api/restream.go index 4e39ceef..132c7575 100644 --- a/http/handler/api/restream.go +++ b/http/handler/api/restream.go @@ -56,31 +56,31 @@ func (h *RestreamHandler) Add(c echo.Context) error { } if err := util.ShouldBindJSON(c, &process); err != nil { - return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) + return api.Err(http.StatusBadRequest, "", "Invalid JSON: %s", err) } if !h.iam.Enforce(ctxuser, process.Domain, "process:"+process.ID, "write") { - return api.Err(http.StatusForbidden, "Forbidden") + return api.Err(http.StatusForbidden, "", "You are not allowed to write this process") } if !superuser { if !h.iam.Enforce(process.Owner, process.Domain, "process:"+process.ID, "write") { - return api.Err(http.StatusForbidden, "Forbidden") + return api.Err(http.StatusForbidden, "", "The owner '%s' is not allowed to write this process", process.Owner) } } if process.Type != "ffmpeg" { - return api.Err(http.StatusBadRequest, "Unsupported process type", "Supported process types are: ffmpeg") + return api.Err(http.StatusBadRequest, "", "Unsupported process type, supported process types are: ffmpeg") } if len(process.Input) == 0 || len(process.Output) == 0 { - return api.Err(http.StatusBadRequest, "At least one input and one output need to be defined") + return api.Err(http.StatusBadRequest, "", "At least one input and one output need to be defined") } config, metadata := process.Marshal() if err := h.restream.AddProcess(config); err != nil { - return api.Err(http.StatusBadRequest, "Invalid process config", "%s", err.Error()) + return api.Err(http.StatusBadRequest, "", "Invalid process config: %s", err.Error()) } tid := app.ProcessID{ @@ -272,7 +272,7 @@ func (h *RestreamHandler) Update(c echo.Context) error { } if !h.iam.Enforce(ctxuser, domain, "process:"+id, "write") { - return api.Err(http.StatusForbidden, "Forbidden") + return api.Err(http.StatusForbidden, "", "You are not allowed to write this process: %s", id) } tid := app.ProcessID{ @@ -293,12 +293,12 @@ func (h *RestreamHandler) Update(c echo.Context) error { } if !h.iam.Enforce(ctxuser, process.Domain, "process:"+process.ID, "write") { - return api.Err(http.StatusForbidden, "Forbidden") + return api.Err(http.StatusForbidden, "", "You are not allowed to write this process: %s", process.ID) } if !superuser { if !h.iam.Enforce(process.Owner, process.Domain, "process:"+process.ID, "write") { - return api.Err(http.StatusForbidden, "Forbidden") + return api.Err(http.StatusForbidden, "", "The owner '%s' is not allowed to write this process: %s", process.Owner, process.ID) } } diff --git a/http/handler/api/session.go b/http/handler/api/session.go index bc45f5cd..e3b2169a 100644 --- a/http/handler/api/session.go +++ b/http/handler/api/session.go @@ -6,6 +6,7 @@ import ( "github.com/datarhei/core/v16/http/api" "github.com/datarhei/core/v16/http/handler/util" + "github.com/datarhei/core/v16/iam" "github.com/datarhei/core/v16/session" "github.com/labstack/echo/v4" @@ -14,12 +15,14 @@ import ( // The SessionHandler type provides handlers to retrieve session information type SessionHandler struct { registry session.RegistryReader + iam iam.IAM } // NewSession returns a new Session type. You have to provide a session registry. -func NewSession(registry session.RegistryReader) *SessionHandler { +func NewSession(registry session.RegistryReader, iam iam.IAM) *SessionHandler { return &SessionHandler{ registry: registry, + iam: iam, } } @@ -77,3 +80,52 @@ func (s *SessionHandler) Active(c echo.Context) error { return c.JSON(http.StatusOK, sessionsActive) } + +// Request access tokens +// @Summary Request access tokens +// @Description Request access tokens +// @Tags v16.?.? +// @ID session-3-create-token +// @Accept json +// @Produce json +// @Param username path string true "Username" +// @Param config body []api.SessionTokenRequest true "Token request" +// @Success 200 {array} api.SessionTokenRequest +// @Failure 400 {object} api.Error +// @Failure 403 {object} api.Error +// @Failure 404 {object} api.Error +// @Security ApiKeyAuth +// @Router /api/v3/session/token/{username} [put] +func (s *SessionHandler) CreateToken(c echo.Context) error { + username := util.PathParam(c, "username") + + identity, err := s.iam.GetVerifier(username) + if err != nil { + return api.Err(http.StatusNotFound, "", "%s", err) + } + + request := []api.SessionTokenRequest{} + + if err := util.ShouldBindJSONValidation(c, &request, false); err != nil { + return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) + } + + for _, r := range request { + err := c.Validate(r) + if err != nil { + return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) + } + } + + for i, req := range request { + data := map[string]interface{}{ + "match": req.Match, + "remote": req.Remote, + "extra": req.Extra, + } + + request[i].Token = identity.GetServiceSession(data) + } + + return c.JSON(http.StatusOK, request) +} diff --git a/http/handler/api/session_test.go b/http/handler/api/session_test.go index 2f2720c9..135b8b9a 100644 --- a/http/handler/api/session_test.go +++ b/http/handler/api/session_test.go @@ -20,7 +20,7 @@ func getDummySessionRouter() *echo.Echo { collector := registry.Collector("foo") collector.RegisterAndActivate("foobar", "", "any", "any") - handler := NewSession(registry) + handler := NewSession(registry, nil) router.Add("GET", "/summary", handler.Summary) router.Add("GET", "/active", handler.Active) diff --git a/http/middleware/iam/iam.go b/http/middleware/iam/iam.go index b12ee398..ded97e9e 100644 --- a/http/middleware/iam/iam.go +++ b/http/middleware/iam/iam.go @@ -185,23 +185,30 @@ func NewWithConfig(config Config) echo.MiddlewareFunc { domain = c.QueryParam("domain") resource = "api:" + resource } else { - identity, err = mw.findIdentityFromBasicAuth(c) + identity, err = mw.findIdentityFromSession(c) if err != nil { - if err == ErrAuthRequired { - c.Response().Header().Set(echo.HeaderWWWAuthenticate, "Basic realm="+realm) - return api.Err(http.StatusUnauthorized, "Unauthorized", "%s", err) - } else { - if config.WaitAfterFailedLogin { - time.Sleep(5 * time.Second) - } + return api.Err(http.StatusForbidden, "Forbidden", "%s", err) + } - if err == ErrBadRequest { - return api.Err(http.StatusBadRequest, "Bad request", "%s", err) - } else if err == ErrUnauthorized { + if identity == nil { + identity, err = mw.findIdentityFromBasicAuth(c) + if err != nil { + if err == ErrAuthRequired { c.Response().Header().Set(echo.HeaderWWWAuthenticate, "Basic realm="+realm) return api.Err(http.StatusUnauthorized, "Unauthorized", "%s", err) } else { - return api.Err(http.StatusForbidden, "Forbidden", "%s", err) + if config.WaitAfterFailedLogin { + time.Sleep(5 * time.Second) + } + + if err == ErrBadRequest { + return api.Err(http.StatusBadRequest, "Bad request", "%s", err) + } else if err == ErrUnauthorized { + c.Response().Header().Set(echo.HeaderWWWAuthenticate, "Basic realm="+realm) + return api.Err(http.StatusUnauthorized, "Unauthorized", "%s", err) + } else { + return api.Err(http.StatusForbidden, "Forbidden", "%s", err) + } } } } @@ -239,7 +246,7 @@ func NewWithConfig(config Config) echo.MiddlewareFunc { } } -var ErrAuthRequired = errors.New("unauthorized") +var ErrAuthRequired = errors.New("authentication required") var ErrUnauthorized = errors.New("unauthorized") var ErrBadRequest = errors.New("bad request") @@ -302,6 +309,61 @@ func (m *iammiddleware) findIdentityFromBasicAuth(c echo.Context) (iamidentity.V return identity, nil } +func (m *iammiddleware) findIdentityFromSession(c echo.Context) (iamidentity.Verifier, error) { + // Look for "token" query parameter + auth := c.QueryParam("token") + + if len(auth) == 0 { + return nil, nil + } + + p := &jwtgo.Parser{} + token, _, err := p.ParseUnverified(auth, jwtgo.MapClaims{}) + if err != nil { + m.logger.Debug().WithFields(log.Fields{ + "path": c.Request().URL.Path, + "method": c.Request().Method, + }).WithError(err).Log("identity not found") + return nil, err + } + + claims, ok := token.Claims.(jwtgo.MapClaims) + if !ok { + m.logger.Debug().WithFields(log.Fields{ + "path": c.Request().URL.Path, + "method": c.Request().Method, + }).WithError(err).Log("identity not found") + return nil, fmt.Errorf("invalid token. claims") + } + + var subject string + if sub, ok := claims["sub"]; ok { + subject = sub.(string) + } + + identity, err := m.iam.GetVerifier(subject) + if err != nil { + m.logger.Debug().WithFields(log.Fields{ + "path": c.Request().URL.Path, + "method": c.Request().Method, + }).WithError(err).Log("identity not found") + return nil, fmt.Errorf("invalid token, identity") + } + + ok, data, err := identity.VerifyServiceSession(auth) + if !ok { + m.logger.Debug().WithFields(log.Fields{ + "path": c.Request().URL.Path, + "method": c.Request().Method, + }).WithError(err).Log("identity not found") + return nil, fmt.Errorf("invalid token, verify: %w", err) + } + + c.Set("session", data) + + return identity, nil +} + func (m *iammiddleware) findIdentityFromJWT(c echo.Context) (iamidentity.Verifier, error) { // Look for an Auth header values := c.Request().Header.Values("Authorization") @@ -332,18 +394,23 @@ func (m *iammiddleware) findIdentityFromJWT(c echo.Context) (iamidentity.Verifie return nil, err } + claims, ok := token.Claims.(jwtgo.MapClaims) + if !ok { + m.logger.Debug().WithFields(log.Fields{ + "path": c.Request().URL.Path, + "method": c.Request().Method, + }).WithError(err).Log("identity not found") + return nil, fmt.Errorf("invalid token") + } + var subject string - if claims, ok := token.Claims.(jwtgo.MapClaims); ok { - if sub, ok := claims["sub"]; ok { - subject = sub.(string) - } + if sub, ok := claims["sub"]; ok { + subject = sub.(string) } var usefor string - if claims, ok := token.Claims.(jwtgo.MapClaims); ok { - if sub, ok := claims["usefor"]; ok { - usefor = sub.(string) - } + if sub, ok := claims["usefor"]; ok { + usefor = sub.(string) } identity, err := m.iam.GetVerifier(subject) diff --git a/http/middleware/session/HLS.go b/http/middleware/session/HLS.go index 17c719a8..7be7c71f 100644 --- a/http/middleware/session/HLS.go +++ b/http/middleware/session/HLS.go @@ -13,6 +13,9 @@ import ( "strings" "sync" + "github.com/datarhei/core/v16/glob" + "github.com/datarhei/core/v16/http/api" + "github.com/datarhei/core/v16/http/handler/util" "github.com/datarhei/core/v16/net" "github.com/datarhei/core/v16/session" @@ -113,7 +116,9 @@ func (h *hls) handleIngress(c echo.Context, next echo.HandlerFunc) error { // Register a new session reference := strings.TrimSuffix(filepath.Base(path), filepath.Ext(path)) h.ingressCollector.RegisterAndActivate(path, reference, path, "") - h.ingressCollector.Extra(path, req.Header.Get("User-Agent")) + h.ingressCollector.Extra(path, map[string]interface{}{ + "user_agent": req.Header.Get("User-Agent"), + }) } h.ingressCollector.Ingress(path, headerSize(req.Header)) @@ -163,6 +168,8 @@ func (h *hls) handleEgress(c echo.Context, next echo.HandlerFunc) error { return next(c) } + ctxuser := util.DefaultContext(c, "user", "") + path := req.URL.Path sessionID := c.QueryParam("session") @@ -171,6 +178,46 @@ func (h *hls) handleEgress(c echo.Context, next echo.HandlerFunc) error { rewrite := false + data := map[string]interface{}{} + + e := util.DefaultContext[interface{}](c, "session", nil) + if e != nil { + var ok bool + data, ok = e.(map[string]interface{}) + if !ok { + return api.Err(http.StatusForbidden, "", "invalid session data, cast") + } + + if match, ok := data["match"].(string); ok { + if ok, err := glob.Match(match, path, '/'); !ok { + if err != nil { + return api.Err(http.StatusForbidden, "", "invalid session data, no match for '%s' in %s: %s", match, path, err.Error()) + } + + return api.Err(http.StatusForbidden, "", "invalid session data, no match for '%s' in %s", match, path) + } + } + + referrer := req.Header.Get("Referer") + if u, err := url.Parse(referrer); err == nil { + referrer = u.Host + } + + if remote, ok := data["remote"].([]string); ok { + match := false + for _, r := range remote { + if referrer == r { + match = true + break + } + } + + if !match { + return api.Err(http.StatusForbidden, "", "invalid session data, remote") + } + } + } + if isM3U8 { if !h.egressCollector.IsKnownSession(sessionID) { if h.egressCollector.IsSessionsExceeded() { @@ -202,13 +249,16 @@ func (h *hls) handleEgress(c echo.Context, next echo.HandlerFunc) error { } ip, _ := net.AnonymizeIPString(c.RealIP()) - extra := "[" + ip + "] " + req.Header.Get("User-Agent") + + data["ip"] = ip + data["user_agent"] = req.Header.Get("User-Agent") + data["name"] = ctxuser reference := strings.TrimSuffix(filepath.Base(path), filepath.Ext(path)) // Register a new session h.egressCollector.Register(sessionID, reference, path, referrer) - h.egressCollector.Extra(sessionID, extra) + h.egressCollector.Extra(sessionID, data) // Give the new session an initial top bitrate h.egressCollector.SessionSetTopEgressBitrate(sessionID, streamBitrate) @@ -241,7 +291,7 @@ func (h *hls) handleEgress(c echo.Context, next echo.HandlerFunc) error { res.Writer = writer if rewrite { - if res.Status != 200 { + if res.Status < 200 || res.Status >= 300 { res.Write(rewriter.buffer.Bytes()) return nil } @@ -254,13 +304,15 @@ func (h *hls) handleEgress(c echo.Context, next echo.HandlerFunc) error { } if isM3U8 || isTS { - // Collect how many bytes we've written in this session - h.egressCollector.Egress(sessionID, headerSize(res.Header())) - h.egressCollector.Egress(sessionID, res.Size) + if res.Status < 200 || res.Status >= 300 { + // Collect how many bytes we've written in this session + h.egressCollector.Egress(sessionID, headerSize(res.Header())) + h.egressCollector.Egress(sessionID, res.Size) - if isTS { - // Activate the session. If the session is already active, this is a noop - h.egressCollector.Activate(sessionID) + if isTS { + // Activate the session. If the session is already active, this is a noop + h.egressCollector.Activate(sessionID) + } } } @@ -403,7 +455,19 @@ func (g *sessionRewriter) rewriteHLS(sessionID string, requestURL *url.URL) { continue } - q := u.Query() + q := url.Values{} + + for key, values := range requestURL.Query() { + for _, value := range values { + q.Add(key, value) + } + } + + for key, values := range u.Query() { + for _, value := range values { + q.Set(key, value) + } + } loop := false diff --git a/http/server.go b/http/server.go index 8d53b3ff..8242f636 100644 --- a/http/server.go +++ b/http/server.go @@ -307,6 +307,7 @@ func NewServer(config Config) (Server, error) { s.v3handler.session = api.NewSession( config.Sessions, + config.IAM, ) s.middleware.session = mwsession.NewHLSWithConfig(mwsession.HLSConfig{ EgressCollector: config.Sessions.Collector("hls"), @@ -662,6 +663,7 @@ func (s *server) setRoutesV3(v3 *echo.Group) { if s.v3handler.session != nil { v3.GET("/session", s.v3handler.session.Summary) v3.GET("/session/active", s.v3handler.session.Active) + v3.PUT("/session/token/:username", s.v3handler.session.CreateToken) } // v3 Cluster diff --git a/iam/identity/identity.go b/iam/identity/identity.go index 766e4937..4fadf4e7 100644 --- a/iam/identity/identity.go +++ b/iam/identity/identity.go @@ -40,8 +40,9 @@ type UserAuthAPIAuth0 struct { } type UserAuthServices struct { - Basic []string `json:"basic"` - Token []string `json:"token"` + Basic []string `json:"basic"` // Passwords for BasicAuth + Token []string `json:"token"` // Tokens/Streamkey for RTMP and SRT + Session []string `json:"session"` // Secrets for session JWT } func (u *User) validate() error { @@ -79,9 +80,15 @@ func (u *User) marshalIdentity() *identity { func (u *User) clone() User { user := *u + user.Auth.Services.Basic = make([]string, len(u.Auth.Services.Basic)) + copy(user.Auth.Services.Basic, u.Auth.Services.Basic) + user.Auth.Services.Token = make([]string, len(u.Auth.Services.Token)) copy(user.Auth.Services.Token, u.Auth.Services.Token) + user.Auth.Services.Session = make([]string, len(u.Auth.Services.Session)) + copy(user.Auth.Services.Session, u.Auth.Services.Session) + return user } @@ -95,9 +102,11 @@ type Verifier interface { VerifyServiceBasicAuth(password string) (bool, error) VerifyServiceToken(token string) (bool, error) + VerifyServiceSession(jwt string) (bool, interface{}, error) GetServiceBasicAuth() string GetServiceToken() string + GetServiceSession(interface{}) string IsSuperuser() bool } @@ -272,11 +281,7 @@ func (i *identity) VerifyJWT(jwt string) (bool, error) { return false, fmt.Errorf("wrong issuer") } - if token.Method.Alg() != "HS256" { - return false, fmt.Errorf("invalid hashing algorithm") - } - - token, err = jwtgo.Parse(jwt, i.jwtKeyFunc) + token, err = jwtgo.Parse(jwt, i.jwtKeyFunc, jwtgo.WithValidMethods([]string{"HS256"})) if err != nil { return false, err } @@ -364,6 +369,101 @@ func (i *identity) GetServiceToken() string { return "" } +func (i *identity) VerifyServiceSession(jwt string) (bool, interface{}, error) { + i.lock.RLock() + defer i.lock.RUnlock() + + if !i.isValid() { + return false, nil, fmt.Errorf("invalid identity") + } + + if len(i.user.Auth.Services.Session) == 0 { + return false, nil, nil + } + + p := &jwtgo.Parser{} + token, _, err := p.ParseUnverified(jwt, jwtgo.MapClaims{}) + if err != nil { + return false, nil, err + } + + claims, ok := token.Claims.(jwtgo.MapClaims) + if !ok { + return false, nil, fmt.Errorf("invalid claims") + } + + var subject string + if sub, ok := claims["sub"]; ok { + subject = sub.(string) + } + + if subject != i.user.Name { + return false, nil, fmt.Errorf("wrong subject") + } + + var issuer string + if sub, ok := claims["iss"]; ok { + issuer = sub.(string) + } + + if issuer != i.jwtRealm { + return false, nil, fmt.Errorf("wrong issuer") + } + + for _, secret := range i.user.Auth.Services.Session { + fn := func(*jwtgo.Token) (interface{}, error) { return []byte(secret), nil } + token, err = jwtgo.Parse(jwt, fn, jwtgo.WithValidMethods([]string{"HS256"})) + if err == nil { + break + } + } + + if err != nil { + return false, nil, fmt.Errorf("parse: %w", err) + } + + if !token.Valid { + return false, nil, fmt.Errorf("invalid token") + } + + return true, claims["data"], nil +} + +func (i *identity) GetServiceSession(data interface{}) string { + i.lock.RLock() + defer i.lock.RUnlock() + + if !i.isValid() { + return "" + } + + if len(i.user.Auth.Services.Session) == 0 { + return "" + } + + now := time.Now() + accessExpires := now.Add(time.Minute * 10) + + // Create access token + accessToken := jwtgo.NewWithClaims(jwtgo.SigningMethodHS256, jwtgo.MapClaims{ + "iss": i.jwtRealm, + "sub": i.user.Name, + "iat": now.Unix(), + "exp": accessExpires.Unix(), + "exi": uint64(accessExpires.Sub(now).Seconds()), + "jti": uuid.New().String(), + "data": data, + }) + + // Generate encoded access token + at, err := accessToken.SignedString([]byte(i.user.Auth.Services.Session[0])) + if err != nil { + return "" + } + + return at +} + func (i *identity) isValid() bool { return i.valid } diff --git a/session/collector.go b/session/collector.go index f8a473d9..5598007d 100644 --- a/session/collector.go +++ b/session/collector.go @@ -19,9 +19,10 @@ type Session struct { ID string Reference string CreatedAt time.Time + ClosesAt time.Time Location string Peer string - Extra string + Extra map[string]interface{} RxBytes uint64 RxBitrate float64 // bit/s TopRxBitrate float64 // bit/s @@ -81,7 +82,7 @@ type Collector interface { RegisterAndActivate(id, reference, location, peer string) // Add arbitrary extra data to a session - Extra(id, extra string) + Extra(id string, extra map[string]interface{}) // Unregister cancels a session prematurely. Unregister(id string) @@ -581,7 +582,7 @@ func (c *collector) Activate(id string) bool { return false } -func (c *collector) Extra(id, extra string) { +func (c *collector) Extra(id string, extra map[string]interface{}) { c.lock.session.RLock() sess, ok := c.sessions[id] c.lock.session.RUnlock() @@ -924,7 +925,7 @@ func NewNullCollector() Collector func (n *nullCollector) Register(id, reference, location, peer string) {} func (n *nullCollector) Activate(id string) bool { return false } func (n *nullCollector) RegisterAndActivate(id, reference, location, peer string) {} -func (n *nullCollector) Extra(id, extra string) {} +func (n *nullCollector) Extra(id string, extra map[string]interface{}) {} func (n *nullCollector) Unregister(id string) {} func (n *nullCollector) Ingress(id string, size int64) {} func (n *nullCollector) Egress(id string, size int64) {} diff --git a/session/session.go b/session/session.go index 110fdc12..66a1c8a4 100644 --- a/session/session.go +++ b/session/session.go @@ -12,6 +12,7 @@ type session struct { id string reference string createdAt time.Time + closedAt time.Time logger log.Logger @@ -20,7 +21,7 @@ type session struct { location string peer string - extra string + extra map[string]interface{} stale *time.Timer timeout time.Duration @@ -53,6 +54,7 @@ func (s *session) Init(id, reference string, closeCallback func(*session), inact s.location = "" s.peer = "" + s.extra = map[string]interface{}{} s.rxBitrate, _ = average.New(averageWindow, averageGranularity) s.txBitrate, _ = average.New(averageWindow, averageGranularity) @@ -91,6 +93,8 @@ func (s *session) close() { s.stale.Stop() s.lock.Unlock() + s.closedAt = time.Now() + close(s.tickerStop) s.rxBitrate.Stop() s.txBitrate.Stop() @@ -125,7 +129,7 @@ func (s *session) Activate() bool { return true } -func (s *session) Extra(extra string) { +func (s *session) Extra(extra map[string]interface{}) { s.extra = extra s.logger = s.logger.WithField("extra", extra)