WIP: add session token, missing: writing sessions to log

This commit is contained in:
Ingo Oppermann
2023-06-14 12:30:50 +02:00
parent 98a307c174
commit 50e943a075
18 changed files with 682 additions and 126 deletions

View File

@@ -926,12 +926,21 @@ const docTemplateClusterAPI = `{
"type": "object",
"properties": {
"basic": {
"description": "Passwords for BasicAuth",
"type": "array",
"items": {
"type": "string"
}
},
"session": {
"description": "Secrets for session JWT",
"type": "array",
"items": {
"type": "string"
}
},
"token": {
"description": "Tokens/Streamkey for RTMP and SRT",
"type": "array",
"items": {
"type": "string"

View File

@@ -918,12 +918,21 @@
"type": "object",
"properties": {
"basic": {
"description": "Passwords for BasicAuth",
"type": "array",
"items": {
"type": "string"
}
},
"session": {
"description": "Secrets for session JWT",
"type": "array",
"items": {
"type": "string"
}
},
"token": {
"description": "Tokens/Streamkey for RTMP and SRT",
"type": "array",
"items": {
"type": "string"

View File

@@ -186,10 +186,17 @@ definitions:
identity.UserAuthServices:
properties:
basic:
description: Passwords for BasicAuth
items:
type: string
type: array
session:
description: Secrets for session JWT
items:
type: string
type: array
token:
description: Tokens/Streamkey for RTMP and SRT
items:
type: string
type: array

View File

@@ -3493,6 +3493,77 @@ const docTemplate = `{
}
}
},
"/api/v3/session/token/{username}": {
"put": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Request access tokens",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"v16.?.?"
],
"summary": "Request access tokens",
"operationId": "session-3-create-token",
"parameters": [
{
"type": "string",
"description": "Username",
"name": "username",
"in": "path",
"required": true
},
{
"description": "Token request",
"name": "config",
"in": "body",
"required": true,
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/api.SessionTokenRequest"
}
}
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/api.SessionTokenRequest"
}
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/api.Error"
}
},
"403": {
"description": "Forbidden",
"schema": {
"$ref": "#/definitions/api.Error"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/api.Error"
}
}
}
}
},
"/api/v3/skills": {
"get": {
"security": [
@@ -4084,13 +4155,10 @@ const docTemplate = `{
"description": "ip:port",
"type": "string"
},
"bootstrap": {
"type": "boolean"
},
"debug": {
"type": "boolean"
},
"emergency_leader_timeout": {
"emergency_leader_timeout_sec": {
"description": "seconds",
"type": "integer",
"format": "int64"
@@ -4098,7 +4166,7 @@ const docTemplate = `{
"enable": {
"type": "boolean"
},
"node_recover_timeout": {
"node_recover_timeout_sec": {
"description": "seconds",
"type": "integer",
"format": "int64"
@@ -4109,10 +4177,7 @@ const docTemplate = `{
"type": "string"
}
},
"recover": {
"type": "boolean"
},
"sync_interval": {
"sync_interval_sec": {
"description": "seconds",
"type": "integer",
"format": "int64"
@@ -4842,6 +4907,12 @@ const docTemplate = `{
"type": "string"
}
},
"session": {
"type": "array",
"items": {
"type": "string"
}
},
"token": {
"type": "array",
"items": {
@@ -6023,7 +6094,8 @@ const docTemplate = `{
"format": "int64"
},
"extra": {
"type": "string"
"type": "object",
"additionalProperties": true
},
"id": {
"type": "string"
@@ -6160,6 +6232,27 @@ const docTemplate = `{
}
}
},
"api.SessionTokenRequest": {
"type": "object",
"properties": {
"extra": {
"type": "object",
"additionalProperties": true
},
"match": {
"type": "string"
},
"remote": {
"type": "array",
"items": {
"type": "string"
}
},
"token": {
"type": "string"
}
}
},
"api.SessionsActive": {
"type": "object",
"additionalProperties": {
@@ -6274,13 +6367,10 @@ const docTemplate = `{
"description": "ip:port",
"type": "string"
},
"bootstrap": {
"type": "boolean"
},
"debug": {
"type": "boolean"
},
"emergency_leader_timeout": {
"emergency_leader_timeout_sec": {
"description": "seconds",
"type": "integer",
"format": "int64"
@@ -6288,7 +6378,7 @@ const docTemplate = `{
"enable": {
"type": "boolean"
},
"node_recover_timeout": {
"node_recover_timeout_sec": {
"description": "seconds",
"type": "integer",
"format": "int64"
@@ -6299,10 +6389,7 @@ const docTemplate = `{
"type": "string"
}
},
"recover": {
"type": "boolean"
},
"sync_interval": {
"sync_interval_sec": {
"description": "seconds",
"type": "integer",
"format": "int64"

View File

@@ -3485,6 +3485,77 @@
}
}
},
"/api/v3/session/token/{username}": {
"put": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Request access tokens",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"v16.?.?"
],
"summary": "Request access tokens",
"operationId": "session-3-create-token",
"parameters": [
{
"type": "string",
"description": "Username",
"name": "username",
"in": "path",
"required": true
},
{
"description": "Token request",
"name": "config",
"in": "body",
"required": true,
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/api.SessionTokenRequest"
}
}
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/api.SessionTokenRequest"
}
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/api.Error"
}
},
"403": {
"description": "Forbidden",
"schema": {
"$ref": "#/definitions/api.Error"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/api.Error"
}
}
}
}
},
"/api/v3/skills": {
"get": {
"security": [
@@ -4076,13 +4147,10 @@
"description": "ip:port",
"type": "string"
},
"bootstrap": {
"type": "boolean"
},
"debug": {
"type": "boolean"
},
"emergency_leader_timeout": {
"emergency_leader_timeout_sec": {
"description": "seconds",
"type": "integer",
"format": "int64"
@@ -4090,7 +4158,7 @@
"enable": {
"type": "boolean"
},
"node_recover_timeout": {
"node_recover_timeout_sec": {
"description": "seconds",
"type": "integer",
"format": "int64"
@@ -4101,10 +4169,7 @@
"type": "string"
}
},
"recover": {
"type": "boolean"
},
"sync_interval": {
"sync_interval_sec": {
"description": "seconds",
"type": "integer",
"format": "int64"
@@ -4834,6 +4899,12 @@
"type": "string"
}
},
"session": {
"type": "array",
"items": {
"type": "string"
}
},
"token": {
"type": "array",
"items": {
@@ -6015,7 +6086,8 @@
"format": "int64"
},
"extra": {
"type": "string"
"type": "object",
"additionalProperties": true
},
"id": {
"type": "string"
@@ -6152,6 +6224,27 @@
}
}
},
"api.SessionTokenRequest": {
"type": "object",
"properties": {
"extra": {
"type": "object",
"additionalProperties": true
},
"match": {
"type": "string"
},
"remote": {
"type": "array",
"items": {
"type": "string"
}
},
"token": {
"type": "string"
}
}
},
"api.SessionsActive": {
"type": "object",
"additionalProperties": {
@@ -6266,13 +6359,10 @@
"description": "ip:port",
"type": "string"
},
"bootstrap": {
"type": "boolean"
},
"debug": {
"type": "boolean"
},
"emergency_leader_timeout": {
"emergency_leader_timeout_sec": {
"description": "seconds",
"type": "integer",
"format": "int64"
@@ -6280,7 +6370,7 @@
"enable": {
"type": "boolean"
},
"node_recover_timeout": {
"node_recover_timeout_sec": {
"description": "seconds",
"type": "integer",
"format": "int64"
@@ -6291,10 +6381,7 @@
"type": "string"
}
},
"recover": {
"type": "boolean"
},
"sync_interval": {
"sync_interval_sec": {
"description": "seconds",
"type": "integer",
"format": "int64"

View File

@@ -274,17 +274,15 @@ definitions:
address:
description: ip:port
type: string
bootstrap:
type: boolean
debug:
type: boolean
emergency_leader_timeout:
emergency_leader_timeout_sec:
description: seconds
format: int64
type: integer
enable:
type: boolean
node_recover_timeout:
node_recover_timeout_sec:
description: seconds
format: int64
type: integer
@@ -292,9 +290,7 @@ definitions:
items:
type: string
type: array
recover:
type: boolean
sync_interval:
sync_interval_sec:
description: seconds
format: int64
type: integer
@@ -782,6 +778,10 @@ definitions:
items:
type: string
type: array
session:
items:
type: string
type: array
token:
items:
type: string
@@ -1652,7 +1652,8 @@ definitions:
format: int64
type: integer
extra:
type: string
additionalProperties: true
type: object
id:
type: string
local:
@@ -1746,6 +1747,20 @@ definitions:
format: uint64
type: integer
type: object
api.SessionTokenRequest:
properties:
extra:
additionalProperties: true
type: object
match:
type: string
remote:
items:
type: string
type: array
token:
type: string
type: object
api.SessionsActive:
additionalProperties:
items:
@@ -1820,17 +1835,15 @@ definitions:
address:
description: ip:port
type: string
bootstrap:
type: boolean
debug:
type: boolean
emergency_leader_timeout:
emergency_leader_timeout_sec:
description: seconds
format: int64
type: integer
enable:
type: boolean
node_recover_timeout:
node_recover_timeout_sec:
description: seconds
format: int64
type: integer
@@ -1838,9 +1851,7 @@ definitions:
items:
type: string
type: array
recover:
type: boolean
sync_interval:
sync_interval_sec:
description: seconds
format: int64
type: integer
@@ -4653,6 +4664,52 @@ paths:
summary: Get a minimal summary of all active sessions
tags:
- v16.7.2
/api/v3/session/token/{username}:
put:
consumes:
- application/json
description: Request access tokens
operationId: session-3-create-token
parameters:
- description: Username
in: path
name: username
required: true
type: string
- description: Token request
in: body
name: config
required: true
schema:
items:
$ref: '#/definitions/api.SessionTokenRequest'
type: array
produces:
- application/json
responses:
"200":
description: OK
schema:
items:
$ref: '#/definitions/api.SessionTokenRequest'
type: array
"400":
description: Bad Request
schema:
$ref: '#/definitions/api.Error'
"403":
description: Forbidden
schema:
$ref: '#/definitions/api.Error'
"404":
description: Not Found
schema:
$ref: '#/definitions/api.Error'
security:
- ApiKeyAuth: []
summary: Request access tokens
tags:
- v16.?.?
/api/v3/skills:
get:
description: List all detected FFmpeg capabilities.

View File

@@ -33,7 +33,7 @@ func (w *wrappedCollector) RegisterAndActivate(id, reference, location, peer str
w.Collector.RegisterAndActivate(w.prefix+id, w.reference, location, peer)
}
func (w *wrappedCollector) Extra(id, extra string) {
func (w *wrappedCollector) Extra(id string, extra map[string]interface{}) {
w.Collector.Extra(w.prefix+id, extra)
}

View File

@@ -30,6 +30,7 @@ func (u *IAMUser) Marshal(user identity.User, policies []access.Policy) {
Services: IAMUserAuthServices{
Basic: user.Auth.Services.Basic,
Token: user.Auth.Services.Token,
Session: user.Auth.Services.Session,
},
}
@@ -61,6 +62,7 @@ func (u *IAMUser) Unmarshal() (identity.User, []access.Policy) {
Services: identity.UserAuthServices{
Basic: u.Auth.Services.Basic,
Token: u.Auth.Services.Token,
Session: u.Auth.Services.Session,
},
},
}
@@ -97,6 +99,7 @@ type IAMUserAuthAPIAuth0 struct {
type IAMUserAuthServices struct {
Basic []string `json:"basic"`
Token []string `json:"token"`
Session []string `json:"session"`
}
type IAMAuth0Tenant struct {

View File

@@ -27,7 +27,7 @@ type Session struct {
CreatedAt int64 `json:"created_at" format:"int64"`
Location string `json:"local"`
Peer string `json:"remote"`
Extra string `json:"extra"`
Extra map[string]interface{} `json:"extra"`
RxBytes uint64 `json:"bytes_rx" format:"uint64"`
TxBytes uint64 `json:"bytes_tx" format:"uint64"`
RxBitrate json.Number `json:"bandwidth_rx_kbit" swaggertype:"number" jsonschema:"type=number"` // kbit/s
@@ -140,3 +140,10 @@ func (summary *SessionSummary) Unmarshal(sum session.Summary) {
summary.Summary.TotalRxBytes = sum.Summary.TotalRxBytes / 1024 / 1024
summary.Summary.TotalTxBytes = sum.Summary.TotalTxBytes / 1024 / 1024
}
type SessionTokenRequest struct {
Match string `json:"match"`
Remote []string `json:"remote"`
Extra map[string]interface{} `json:"extra"`
Token string `json:"token,omitempty"`
}

View File

@@ -56,31 +56,31 @@ func (h *RestreamHandler) Add(c echo.Context) error {
}
if err := util.ShouldBindJSON(c, &process); err != nil {
return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err)
return api.Err(http.StatusBadRequest, "", "Invalid JSON: %s", err)
}
if !h.iam.Enforce(ctxuser, process.Domain, "process:"+process.ID, "write") {
return api.Err(http.StatusForbidden, "Forbidden")
return api.Err(http.StatusForbidden, "", "You are not allowed to write this process")
}
if !superuser {
if !h.iam.Enforce(process.Owner, process.Domain, "process:"+process.ID, "write") {
return api.Err(http.StatusForbidden, "Forbidden")
return api.Err(http.StatusForbidden, "", "The owner '%s' is not allowed to write this process", process.Owner)
}
}
if process.Type != "ffmpeg" {
return api.Err(http.StatusBadRequest, "Unsupported process type", "Supported process types are: ffmpeg")
return api.Err(http.StatusBadRequest, "", "Unsupported process type, supported process types are: ffmpeg")
}
if len(process.Input) == 0 || len(process.Output) == 0 {
return api.Err(http.StatusBadRequest, "At least one input and one output need to be defined")
return api.Err(http.StatusBadRequest, "", "At least one input and one output need to be defined")
}
config, metadata := process.Marshal()
if err := h.restream.AddProcess(config); err != nil {
return api.Err(http.StatusBadRequest, "Invalid process config", "%s", err.Error())
return api.Err(http.StatusBadRequest, "", "Invalid process config: %s", err.Error())
}
tid := app.ProcessID{
@@ -272,7 +272,7 @@ func (h *RestreamHandler) Update(c echo.Context) error {
}
if !h.iam.Enforce(ctxuser, domain, "process:"+id, "write") {
return api.Err(http.StatusForbidden, "Forbidden")
return api.Err(http.StatusForbidden, "", "You are not allowed to write this process: %s", id)
}
tid := app.ProcessID{
@@ -293,12 +293,12 @@ func (h *RestreamHandler) Update(c echo.Context) error {
}
if !h.iam.Enforce(ctxuser, process.Domain, "process:"+process.ID, "write") {
return api.Err(http.StatusForbidden, "Forbidden")
return api.Err(http.StatusForbidden, "", "You are not allowed to write this process: %s", process.ID)
}
if !superuser {
if !h.iam.Enforce(process.Owner, process.Domain, "process:"+process.ID, "write") {
return api.Err(http.StatusForbidden, "Forbidden")
return api.Err(http.StatusForbidden, "", "The owner '%s' is not allowed to write this process: %s", process.Owner, process.ID)
}
}

View File

@@ -6,6 +6,7 @@ import (
"github.com/datarhei/core/v16/http/api"
"github.com/datarhei/core/v16/http/handler/util"
"github.com/datarhei/core/v16/iam"
"github.com/datarhei/core/v16/session"
"github.com/labstack/echo/v4"
@@ -14,12 +15,14 @@ import (
// The SessionHandler type provides handlers to retrieve session information
type SessionHandler struct {
registry session.RegistryReader
iam iam.IAM
}
// NewSession returns a new Session type. You have to provide a session registry.
func NewSession(registry session.RegistryReader) *SessionHandler {
func NewSession(registry session.RegistryReader, iam iam.IAM) *SessionHandler {
return &SessionHandler{
registry: registry,
iam: iam,
}
}
@@ -77,3 +80,52 @@ func (s *SessionHandler) Active(c echo.Context) error {
return c.JSON(http.StatusOK, sessionsActive)
}
// Request access tokens
// @Summary Request access tokens
// @Description Request access tokens
// @Tags v16.?.?
// @ID session-3-create-token
// @Accept json
// @Produce json
// @Param username path string true "Username"
// @Param config body []api.SessionTokenRequest true "Token request"
// @Success 200 {array} api.SessionTokenRequest
// @Failure 400 {object} api.Error
// @Failure 403 {object} api.Error
// @Failure 404 {object} api.Error
// @Security ApiKeyAuth
// @Router /api/v3/session/token/{username} [put]
func (s *SessionHandler) CreateToken(c echo.Context) error {
username := util.PathParam(c, "username")
identity, err := s.iam.GetVerifier(username)
if err != nil {
return api.Err(http.StatusNotFound, "", "%s", err)
}
request := []api.SessionTokenRequest{}
if err := util.ShouldBindJSONValidation(c, &request, false); err != nil {
return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err)
}
for _, r := range request {
err := c.Validate(r)
if err != nil {
return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err)
}
}
for i, req := range request {
data := map[string]interface{}{
"match": req.Match,
"remote": req.Remote,
"extra": req.Extra,
}
request[i].Token = identity.GetServiceSession(data)
}
return c.JSON(http.StatusOK, request)
}

View File

@@ -20,7 +20,7 @@ func getDummySessionRouter() *echo.Echo {
collector := registry.Collector("foo")
collector.RegisterAndActivate("foobar", "", "any", "any")
handler := NewSession(registry)
handler := NewSession(registry, nil)
router.Add("GET", "/summary", handler.Summary)
router.Add("GET", "/active", handler.Active)

View File

@@ -185,6 +185,12 @@ func NewWithConfig(config Config) echo.MiddlewareFunc {
domain = c.QueryParam("domain")
resource = "api:" + resource
} else {
identity, err = mw.findIdentityFromSession(c)
if err != nil {
return api.Err(http.StatusForbidden, "Forbidden", "%s", err)
}
if identity == nil {
identity, err = mw.findIdentityFromBasicAuth(c)
if err != nil {
if err == ErrAuthRequired {
@@ -205,6 +211,7 @@ func NewWithConfig(config Config) echo.MiddlewareFunc {
}
}
}
}
domain = mw.findDomainFromFilesystem(resource)
resource = "fs:" + resource
@@ -239,7 +246,7 @@ func NewWithConfig(config Config) echo.MiddlewareFunc {
}
}
var ErrAuthRequired = errors.New("unauthorized")
var ErrAuthRequired = errors.New("authentication required")
var ErrUnauthorized = errors.New("unauthorized")
var ErrBadRequest = errors.New("bad request")
@@ -302,6 +309,61 @@ func (m *iammiddleware) findIdentityFromBasicAuth(c echo.Context) (iamidentity.V
return identity, nil
}
func (m *iammiddleware) findIdentityFromSession(c echo.Context) (iamidentity.Verifier, error) {
// Look for "token" query parameter
auth := c.QueryParam("token")
if len(auth) == 0 {
return nil, nil
}
p := &jwtgo.Parser{}
token, _, err := p.ParseUnverified(auth, jwtgo.MapClaims{})
if err != nil {
m.logger.Debug().WithFields(log.Fields{
"path": c.Request().URL.Path,
"method": c.Request().Method,
}).WithError(err).Log("identity not found")
return nil, err
}
claims, ok := token.Claims.(jwtgo.MapClaims)
if !ok {
m.logger.Debug().WithFields(log.Fields{
"path": c.Request().URL.Path,
"method": c.Request().Method,
}).WithError(err).Log("identity not found")
return nil, fmt.Errorf("invalid token. claims")
}
var subject string
if sub, ok := claims["sub"]; ok {
subject = sub.(string)
}
identity, err := m.iam.GetVerifier(subject)
if err != nil {
m.logger.Debug().WithFields(log.Fields{
"path": c.Request().URL.Path,
"method": c.Request().Method,
}).WithError(err).Log("identity not found")
return nil, fmt.Errorf("invalid token, identity")
}
ok, data, err := identity.VerifyServiceSession(auth)
if !ok {
m.logger.Debug().WithFields(log.Fields{
"path": c.Request().URL.Path,
"method": c.Request().Method,
}).WithError(err).Log("identity not found")
return nil, fmt.Errorf("invalid token, verify: %w", err)
}
c.Set("session", data)
return identity, nil
}
func (m *iammiddleware) findIdentityFromJWT(c echo.Context) (iamidentity.Verifier, error) {
// Look for an Auth header
values := c.Request().Header.Values("Authorization")
@@ -332,19 +394,24 @@ func (m *iammiddleware) findIdentityFromJWT(c echo.Context) (iamidentity.Verifie
return nil, err
}
claims, ok := token.Claims.(jwtgo.MapClaims)
if !ok {
m.logger.Debug().WithFields(log.Fields{
"path": c.Request().URL.Path,
"method": c.Request().Method,
}).WithError(err).Log("identity not found")
return nil, fmt.Errorf("invalid token")
}
var subject string
if claims, ok := token.Claims.(jwtgo.MapClaims); ok {
if sub, ok := claims["sub"]; ok {
subject = sub.(string)
}
}
var usefor string
if claims, ok := token.Claims.(jwtgo.MapClaims); ok {
if sub, ok := claims["usefor"]; ok {
usefor = sub.(string)
}
}
identity, err := m.iam.GetVerifier(subject)
if err != nil {

View File

@@ -13,6 +13,9 @@ import (
"strings"
"sync"
"github.com/datarhei/core/v16/glob"
"github.com/datarhei/core/v16/http/api"
"github.com/datarhei/core/v16/http/handler/util"
"github.com/datarhei/core/v16/net"
"github.com/datarhei/core/v16/session"
@@ -113,7 +116,9 @@ func (h *hls) handleIngress(c echo.Context, next echo.HandlerFunc) error {
// Register a new session
reference := strings.TrimSuffix(filepath.Base(path), filepath.Ext(path))
h.ingressCollector.RegisterAndActivate(path, reference, path, "")
h.ingressCollector.Extra(path, req.Header.Get("User-Agent"))
h.ingressCollector.Extra(path, map[string]interface{}{
"user_agent": req.Header.Get("User-Agent"),
})
}
h.ingressCollector.Ingress(path, headerSize(req.Header))
@@ -163,6 +168,8 @@ func (h *hls) handleEgress(c echo.Context, next echo.HandlerFunc) error {
return next(c)
}
ctxuser := util.DefaultContext(c, "user", "")
path := req.URL.Path
sessionID := c.QueryParam("session")
@@ -171,6 +178,46 @@ func (h *hls) handleEgress(c echo.Context, next echo.HandlerFunc) error {
rewrite := false
data := map[string]interface{}{}
e := util.DefaultContext[interface{}](c, "session", nil)
if e != nil {
var ok bool
data, ok = e.(map[string]interface{})
if !ok {
return api.Err(http.StatusForbidden, "", "invalid session data, cast")
}
if match, ok := data["match"].(string); ok {
if ok, err := glob.Match(match, path, '/'); !ok {
if err != nil {
return api.Err(http.StatusForbidden, "", "invalid session data, no match for '%s' in %s: %s", match, path, err.Error())
}
return api.Err(http.StatusForbidden, "", "invalid session data, no match for '%s' in %s", match, path)
}
}
referrer := req.Header.Get("Referer")
if u, err := url.Parse(referrer); err == nil {
referrer = u.Host
}
if remote, ok := data["remote"].([]string); ok {
match := false
for _, r := range remote {
if referrer == r {
match = true
break
}
}
if !match {
return api.Err(http.StatusForbidden, "", "invalid session data, remote")
}
}
}
if isM3U8 {
if !h.egressCollector.IsKnownSession(sessionID) {
if h.egressCollector.IsSessionsExceeded() {
@@ -202,13 +249,16 @@ func (h *hls) handleEgress(c echo.Context, next echo.HandlerFunc) error {
}
ip, _ := net.AnonymizeIPString(c.RealIP())
extra := "[" + ip + "] " + req.Header.Get("User-Agent")
data["ip"] = ip
data["user_agent"] = req.Header.Get("User-Agent")
data["name"] = ctxuser
reference := strings.TrimSuffix(filepath.Base(path), filepath.Ext(path))
// Register a new session
h.egressCollector.Register(sessionID, reference, path, referrer)
h.egressCollector.Extra(sessionID, extra)
h.egressCollector.Extra(sessionID, data)
// Give the new session an initial top bitrate
h.egressCollector.SessionSetTopEgressBitrate(sessionID, streamBitrate)
@@ -241,7 +291,7 @@ func (h *hls) handleEgress(c echo.Context, next echo.HandlerFunc) error {
res.Writer = writer
if rewrite {
if res.Status != 200 {
if res.Status < 200 || res.Status >= 300 {
res.Write(rewriter.buffer.Bytes())
return nil
}
@@ -254,6 +304,7 @@ func (h *hls) handleEgress(c echo.Context, next echo.HandlerFunc) error {
}
if isM3U8 || isTS {
if res.Status < 200 || res.Status >= 300 {
// Collect how many bytes we've written in this session
h.egressCollector.Egress(sessionID, headerSize(res.Header()))
h.egressCollector.Egress(sessionID, res.Size)
@@ -263,6 +314,7 @@ func (h *hls) handleEgress(c echo.Context, next echo.HandlerFunc) error {
h.egressCollector.Activate(sessionID)
}
}
}
return nil
}
@@ -403,7 +455,19 @@ func (g *sessionRewriter) rewriteHLS(sessionID string, requestURL *url.URL) {
continue
}
q := u.Query()
q := url.Values{}
for key, values := range requestURL.Query() {
for _, value := range values {
q.Add(key, value)
}
}
for key, values := range u.Query() {
for _, value := range values {
q.Set(key, value)
}
}
loop := false

View File

@@ -307,6 +307,7 @@ func NewServer(config Config) (Server, error) {
s.v3handler.session = api.NewSession(
config.Sessions,
config.IAM,
)
s.middleware.session = mwsession.NewHLSWithConfig(mwsession.HLSConfig{
EgressCollector: config.Sessions.Collector("hls"),
@@ -662,6 +663,7 @@ func (s *server) setRoutesV3(v3 *echo.Group) {
if s.v3handler.session != nil {
v3.GET("/session", s.v3handler.session.Summary)
v3.GET("/session/active", s.v3handler.session.Active)
v3.PUT("/session/token/:username", s.v3handler.session.CreateToken)
}
// v3 Cluster

View File

@@ -40,8 +40,9 @@ type UserAuthAPIAuth0 struct {
}
type UserAuthServices struct {
Basic []string `json:"basic"`
Token []string `json:"token"`
Basic []string `json:"basic"` // Passwords for BasicAuth
Token []string `json:"token"` // Tokens/Streamkey for RTMP and SRT
Session []string `json:"session"` // Secrets for session JWT
}
func (u *User) validate() error {
@@ -79,9 +80,15 @@ func (u *User) marshalIdentity() *identity {
func (u *User) clone() User {
user := *u
user.Auth.Services.Basic = make([]string, len(u.Auth.Services.Basic))
copy(user.Auth.Services.Basic, u.Auth.Services.Basic)
user.Auth.Services.Token = make([]string, len(u.Auth.Services.Token))
copy(user.Auth.Services.Token, u.Auth.Services.Token)
user.Auth.Services.Session = make([]string, len(u.Auth.Services.Session))
copy(user.Auth.Services.Session, u.Auth.Services.Session)
return user
}
@@ -95,9 +102,11 @@ type Verifier interface {
VerifyServiceBasicAuth(password string) (bool, error)
VerifyServiceToken(token string) (bool, error)
VerifyServiceSession(jwt string) (bool, interface{}, error)
GetServiceBasicAuth() string
GetServiceToken() string
GetServiceSession(interface{}) string
IsSuperuser() bool
}
@@ -272,11 +281,7 @@ func (i *identity) VerifyJWT(jwt string) (bool, error) {
return false, fmt.Errorf("wrong issuer")
}
if token.Method.Alg() != "HS256" {
return false, fmt.Errorf("invalid hashing algorithm")
}
token, err = jwtgo.Parse(jwt, i.jwtKeyFunc)
token, err = jwtgo.Parse(jwt, i.jwtKeyFunc, jwtgo.WithValidMethods([]string{"HS256"}))
if err != nil {
return false, err
}
@@ -364,6 +369,101 @@ func (i *identity) GetServiceToken() string {
return ""
}
func (i *identity) VerifyServiceSession(jwt string) (bool, interface{}, error) {
i.lock.RLock()
defer i.lock.RUnlock()
if !i.isValid() {
return false, nil, fmt.Errorf("invalid identity")
}
if len(i.user.Auth.Services.Session) == 0 {
return false, nil, nil
}
p := &jwtgo.Parser{}
token, _, err := p.ParseUnverified(jwt, jwtgo.MapClaims{})
if err != nil {
return false, nil, err
}
claims, ok := token.Claims.(jwtgo.MapClaims)
if !ok {
return false, nil, fmt.Errorf("invalid claims")
}
var subject string
if sub, ok := claims["sub"]; ok {
subject = sub.(string)
}
if subject != i.user.Name {
return false, nil, fmt.Errorf("wrong subject")
}
var issuer string
if sub, ok := claims["iss"]; ok {
issuer = sub.(string)
}
if issuer != i.jwtRealm {
return false, nil, fmt.Errorf("wrong issuer")
}
for _, secret := range i.user.Auth.Services.Session {
fn := func(*jwtgo.Token) (interface{}, error) { return []byte(secret), nil }
token, err = jwtgo.Parse(jwt, fn, jwtgo.WithValidMethods([]string{"HS256"}))
if err == nil {
break
}
}
if err != nil {
return false, nil, fmt.Errorf("parse: %w", err)
}
if !token.Valid {
return false, nil, fmt.Errorf("invalid token")
}
return true, claims["data"], nil
}
func (i *identity) GetServiceSession(data interface{}) string {
i.lock.RLock()
defer i.lock.RUnlock()
if !i.isValid() {
return ""
}
if len(i.user.Auth.Services.Session) == 0 {
return ""
}
now := time.Now()
accessExpires := now.Add(time.Minute * 10)
// Create access token
accessToken := jwtgo.NewWithClaims(jwtgo.SigningMethodHS256, jwtgo.MapClaims{
"iss": i.jwtRealm,
"sub": i.user.Name,
"iat": now.Unix(),
"exp": accessExpires.Unix(),
"exi": uint64(accessExpires.Sub(now).Seconds()),
"jti": uuid.New().String(),
"data": data,
})
// Generate encoded access token
at, err := accessToken.SignedString([]byte(i.user.Auth.Services.Session[0]))
if err != nil {
return ""
}
return at
}
func (i *identity) isValid() bool {
return i.valid
}

View File

@@ -19,9 +19,10 @@ type Session struct {
ID string
Reference string
CreatedAt time.Time
ClosesAt time.Time
Location string
Peer string
Extra string
Extra map[string]interface{}
RxBytes uint64
RxBitrate float64 // bit/s
TopRxBitrate float64 // bit/s
@@ -81,7 +82,7 @@ type Collector interface {
RegisterAndActivate(id, reference, location, peer string)
// Add arbitrary extra data to a session
Extra(id, extra string)
Extra(id string, extra map[string]interface{})
// Unregister cancels a session prematurely.
Unregister(id string)
@@ -581,7 +582,7 @@ func (c *collector) Activate(id string) bool {
return false
}
func (c *collector) Extra(id, extra string) {
func (c *collector) Extra(id string, extra map[string]interface{}) {
c.lock.session.RLock()
sess, ok := c.sessions[id]
c.lock.session.RUnlock()
@@ -924,7 +925,7 @@ func NewNullCollector() Collector
func (n *nullCollector) Register(id, reference, location, peer string) {}
func (n *nullCollector) Activate(id string) bool { return false }
func (n *nullCollector) RegisterAndActivate(id, reference, location, peer string) {}
func (n *nullCollector) Extra(id, extra string) {}
func (n *nullCollector) Extra(id string, extra map[string]interface{}) {}
func (n *nullCollector) Unregister(id string) {}
func (n *nullCollector) Ingress(id string, size int64) {}
func (n *nullCollector) Egress(id string, size int64) {}

View File

@@ -12,6 +12,7 @@ type session struct {
id string
reference string
createdAt time.Time
closedAt time.Time
logger log.Logger
@@ -20,7 +21,7 @@ type session struct {
location string
peer string
extra string
extra map[string]interface{}
stale *time.Timer
timeout time.Duration
@@ -53,6 +54,7 @@ func (s *session) Init(id, reference string, closeCallback func(*session), inact
s.location = ""
s.peer = ""
s.extra = map[string]interface{}{}
s.rxBitrate, _ = average.New(averageWindow, averageGranularity)
s.txBitrate, _ = average.New(averageWindow, averageGranularity)
@@ -91,6 +93,8 @@ func (s *session) close() {
s.stale.Stop()
s.lock.Unlock()
s.closedAt = time.Now()
close(s.tickerStop)
s.rxBitrate.Stop()
s.txBitrate.Stop()
@@ -125,7 +129,7 @@ func (s *session) Activate() bool {
return true
}
func (s *session) Extra(extra string) {
func (s *session) Extra(extra map[string]interface{}) {
s.extra = extra
s.logger = s.logger.WithField("extra", extra)