mirror of
https://github.com/datarhei/core.git
synced 2025-10-31 03:16:21 +08:00
Add /api/v3/iam/user endpoints
This commit is contained in:
115
rtmp/rtmp.go
115
rtmp/rtmp.go
@@ -106,7 +106,7 @@ func New(config Config) (Server, error) {
|
||||
}
|
||||
|
||||
s := &server{
|
||||
app: config.App,
|
||||
app: filepath.Join("/", config.App),
|
||||
token: config.Token,
|
||||
logger: config.Logger,
|
||||
collector: config.Collector,
|
||||
@@ -184,19 +184,19 @@ func (s *server) Channels() []string {
|
||||
return channels
|
||||
}
|
||||
|
||||
func (s *server) log(who, action, path, message string, client net.Addr) {
|
||||
func (s *server) log(who, what, action, path, message string, client net.Addr) {
|
||||
s.logger.Info().WithFields(log.Fields{
|
||||
"who": who,
|
||||
"what": what,
|
||||
"action": action,
|
||||
"path": path,
|
||||
"client": client.String(),
|
||||
}).Log(message)
|
||||
}
|
||||
|
||||
// GetToken returns the path and the token found in the URL. If the token
|
||||
// was part of the path, the token is removed from the path. The token in
|
||||
// the query string takes precedence. The token in the path is assumed to
|
||||
// be the last path element.
|
||||
// GetToken returns the path without the token and the token found in the URL. If the token
|
||||
// was part of the path, the token is removed from the path. The token in the query string
|
||||
// takes precedence. The token in the path is assumed to be the last path element.
|
||||
func GetToken(u *url.URL) (string, string) {
|
||||
q := u.Query()
|
||||
if q.Has("token") {
|
||||
@@ -204,15 +204,34 @@ func GetToken(u *url.URL) (string, string) {
|
||||
return u.Path, q.Get("token")
|
||||
}
|
||||
|
||||
pathElements := strings.Split(u.EscapedPath(), "/")
|
||||
pathElements := splitPath(u.EscapedPath())
|
||||
nPathElements := len(pathElements)
|
||||
|
||||
if nPathElements == 0 {
|
||||
if nPathElements <= 1 {
|
||||
return u.Path, ""
|
||||
}
|
||||
|
||||
// Return the path without the token
|
||||
return strings.Join(pathElements[:nPathElements-1], "/"), pathElements[nPathElements-1]
|
||||
return "/" + strings.Join(pathElements[:nPathElements-1], "/"), pathElements[nPathElements-1]
|
||||
}
|
||||
|
||||
func splitPath(path string) []string {
|
||||
pathElements := strings.Split(filepath.Clean(path), "/")
|
||||
|
||||
if len(pathElements) == 0 {
|
||||
return pathElements
|
||||
}
|
||||
|
||||
if len(pathElements[0]) == 0 {
|
||||
pathElements = pathElements[1:]
|
||||
}
|
||||
|
||||
return pathElements
|
||||
}
|
||||
|
||||
func removePathPrefix(path, prefix string) (string, string) {
|
||||
prefix = filepath.Join("/", prefix)
|
||||
return filepath.Join("/", strings.TrimPrefix(path, prefix+"/")), prefix
|
||||
}
|
||||
|
||||
// handlePlay is called when a RTMP client wants to play a stream
|
||||
@@ -220,20 +239,22 @@ func (s *server) handlePlay(conn *rtmp.Conn) {
|
||||
defer conn.Close()
|
||||
|
||||
remote := conn.NetConn().RemoteAddr()
|
||||
playPath, token := GetToken(conn.URL)
|
||||
playpath, token := GetToken(conn.URL)
|
||||
|
||||
playpath, _ = removePathPrefix(playpath, s.app)
|
||||
|
||||
identity, err := s.findIdentityFromStreamKey(token)
|
||||
if err != nil {
|
||||
s.logger.Debug().WithError(err).Log("invalid streamkey")
|
||||
s.log("PLAY", "FORBIDDEN", playPath, "invalid streamkey ("+token+")", remote)
|
||||
s.log(identity, "PLAY", "FORBIDDEN", playpath, "invalid streamkey ("+token+")", remote)
|
||||
return
|
||||
}
|
||||
|
||||
domain := s.findDomainFromPlaypath(playPath)
|
||||
resource := "rtmp:" + playPath
|
||||
domain := s.findDomainFromPlaypath(playpath)
|
||||
resource := "rtmp:" + playpath
|
||||
|
||||
if !s.iam.Enforce(identity, domain, resource, "PLAY") {
|
||||
s.log("PLAY", "FORBIDDEN", playPath, "access denied", remote)
|
||||
s.log(identity, "PLAY", "FORBIDDEN", playpath, "access denied", remote)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -258,14 +279,14 @@ func (s *server) handlePlay(conn *rtmp.Conn) {
|
||||
|
||||
// Look for the stream
|
||||
s.lock.RLock()
|
||||
ch := s.channels[playPath]
|
||||
ch := s.channels[playpath]
|
||||
s.lock.RUnlock()
|
||||
|
||||
if ch != nil {
|
||||
// Send the metadata to the client
|
||||
conn.WriteHeader(ch.streams)
|
||||
|
||||
s.log("PLAY", "START", conn.URL.Path, "", remote)
|
||||
s.log(identity, "PLAY", "START", conn.URL.Path, "", remote)
|
||||
|
||||
// Get a cursor and apply filters
|
||||
cursor := ch.queue.Oldest()
|
||||
@@ -292,9 +313,9 @@ func (s *server) handlePlay(conn *rtmp.Conn) {
|
||||
|
||||
ch.RemoveSubscriber(id)
|
||||
|
||||
s.log("PLAY", "STOP", playPath, "", remote)
|
||||
s.log(identity, "PLAY", "STOP", playpath, "", remote)
|
||||
} else {
|
||||
s.log("PLAY", "NOTFOUND", playPath, "", remote)
|
||||
s.log(identity, "PLAY", "NOTFOUND", playpath, "", remote)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -303,52 +324,54 @@ func (s *server) handlePublish(conn *rtmp.Conn) {
|
||||
defer conn.Close()
|
||||
|
||||
remote := conn.NetConn().RemoteAddr()
|
||||
playPath, token := GetToken(conn.URL)
|
||||
playpath, token := GetToken(conn.URL)
|
||||
|
||||
// Check the app patch
|
||||
if !strings.HasPrefix(playPath, s.app) {
|
||||
s.log("PUBLISH", "FORBIDDEN", conn.URL.Path, "invalid app", remote)
|
||||
return
|
||||
}
|
||||
playpath, app := removePathPrefix(playpath, s.app)
|
||||
|
||||
identity, err := s.findIdentityFromStreamKey(token)
|
||||
if err != nil {
|
||||
s.logger.Debug().WithError(err).Log("invalid streamkey")
|
||||
s.log("PUBLISH", "FORBIDDEN", playPath, "invalid streamkey ("+token+")", remote)
|
||||
s.log(identity, "PUBLISH", "FORBIDDEN", playpath, "invalid streamkey ("+token+")", remote)
|
||||
return
|
||||
}
|
||||
|
||||
domain := s.findDomainFromPlaypath(playPath)
|
||||
resource := "rtmp:" + playPath
|
||||
// Check the app patch
|
||||
if app != s.app {
|
||||
s.log(identity, "PUBLISH", "FORBIDDEN", playpath, "invalid app", remote)
|
||||
return
|
||||
}
|
||||
|
||||
domain := s.findDomainFromPlaypath(playpath)
|
||||
resource := "rtmp:" + playpath
|
||||
|
||||
if !s.iam.Enforce(identity, domain, resource, "PUBLISH") {
|
||||
s.log("PUBLISH", "FORBIDDEN", playPath, "access denied", remote)
|
||||
s.log(identity, "PUBLISH", "FORBIDDEN", playpath, "access denied", remote)
|
||||
return
|
||||
}
|
||||
|
||||
err = s.publish(conn, conn.URL, remote, false)
|
||||
err = s.publish(conn, playpath, remote, identity, false)
|
||||
if err != nil {
|
||||
s.logger.WithField("path", conn.URL.Path).WithError(err).Log("")
|
||||
}
|
||||
}
|
||||
|
||||
func (s *server) publish(src connection, u *url.URL, remote net.Addr, isProxy bool) error {
|
||||
func (s *server) publish(src connection, playpath string, remote net.Addr, identity string, isProxy bool) error {
|
||||
// Check the streams if it contains any valid/known streams
|
||||
streams, _ := src.Streams()
|
||||
|
||||
if len(streams) == 0 {
|
||||
s.log("PUBLISH", "INVALID", u.Path, "no streams available", remote)
|
||||
s.log(identity, "PUBLISH", "INVALID", playpath, "no streams available", remote)
|
||||
return fmt.Errorf("no streams are available")
|
||||
}
|
||||
|
||||
s.lock.Lock()
|
||||
|
||||
ch := s.channels[u.Path]
|
||||
ch := s.channels[playpath]
|
||||
if ch == nil {
|
||||
reference := strings.TrimPrefix(strings.TrimSuffix(u.Path, filepath.Ext(u.Path)), s.app+"/")
|
||||
reference := strings.TrimPrefix(strings.TrimSuffix(playpath, filepath.Ext(playpath)), s.app+"/")
|
||||
|
||||
// Create a new channel
|
||||
ch = newChannel(src, u, reference, remote, streams, isProxy, s.collector)
|
||||
ch = newChannel(src, playpath, reference, remote, streams, isProxy, s.collector)
|
||||
|
||||
for _, stream := range streams {
|
||||
typ := stream.Type()
|
||||
@@ -361,7 +384,7 @@ func (s *server) publish(src connection, u *url.URL, remote net.Addr, isProxy bo
|
||||
}
|
||||
}
|
||||
|
||||
s.channels[u.Path] = ch
|
||||
s.channels[playpath] = ch
|
||||
} else {
|
||||
ch = nil
|
||||
}
|
||||
@@ -369,26 +392,26 @@ func (s *server) publish(src connection, u *url.URL, remote net.Addr, isProxy bo
|
||||
s.lock.Unlock()
|
||||
|
||||
if ch == nil {
|
||||
s.log("PUBLISH", "CONFLICT", u.Path, "already publishing", remote)
|
||||
s.log(identity, "PUBLISH", "CONFLICT", playpath, "already publishing", remote)
|
||||
return fmt.Errorf("already publishing")
|
||||
}
|
||||
|
||||
s.log("PUBLISH", "START", u.Path, "", remote)
|
||||
s.log(identity, "PUBLISH", "START", playpath, "", remote)
|
||||
|
||||
for _, stream := range streams {
|
||||
s.log("PUBLISH", "STREAM", u.Path, stream.Type().String(), remote)
|
||||
s.log(identity, "PUBLISH", "STREAM", playpath, stream.Type().String(), remote)
|
||||
}
|
||||
|
||||
// Ingest the data, blocks until done
|
||||
avutil.CopyPackets(ch.queue, src)
|
||||
|
||||
s.lock.Lock()
|
||||
delete(s.channels, u.Path)
|
||||
delete(s.channels, playpath)
|
||||
s.lock.Unlock()
|
||||
|
||||
ch.Close()
|
||||
|
||||
s.log("PUBLISH", "STOP", u.Path, "", remote)
|
||||
s.log(identity, "PUBLISH", "STOP", playpath, "", remote)
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -405,10 +428,10 @@ func (s *server) findIdentityFromStreamKey(key string) (string, error) {
|
||||
|
||||
elements := strings.Split(key, ":")
|
||||
if len(elements) == 1 {
|
||||
identity, err = s.iam.GetDefaultIdentity()
|
||||
identity, err = s.iam.GetDefaultVerifier()
|
||||
token = elements[0]
|
||||
} else {
|
||||
identity, err = s.iam.GetIdentity(elements[0])
|
||||
identity, err = s.iam.GetVerifier(elements[0])
|
||||
token = elements[1]
|
||||
}
|
||||
|
||||
@@ -423,10 +446,12 @@ func (s *server) findIdentityFromStreamKey(key string) (string, error) {
|
||||
return identity.Name(), nil
|
||||
}
|
||||
|
||||
// findDomainFromPlaypath finds the domain in the path. The domain is
|
||||
// the first path element. If there's only one path element, it is not
|
||||
// considered the domain. It is assumed that the app is not part of
|
||||
// the provided path.
|
||||
func (s *server) findDomainFromPlaypath(path string) string {
|
||||
path = strings.TrimPrefix(path, filepath.Join(s.app, "/"))
|
||||
|
||||
elements := strings.Split(path, "/")
|
||||
elements := splitPath(path)
|
||||
if len(elements) == 1 {
|
||||
return "$none"
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user