mirror of
https://github.com/datarhei/core.git
synced 2025-11-03 02:23:42 +08:00
Add policy enforcer for SRT connections
This commit is contained in:
90
srt/srt.go
90
srt/srt.go
@@ -202,6 +202,8 @@ type server struct {
|
||||
srtloggerCancel context.CancelFunc
|
||||
srtlog map[string]*ring.Ring
|
||||
srtlogLock sync.RWMutex
|
||||
|
||||
iam iam.IAM
|
||||
}
|
||||
|
||||
func New(config Config) (Server, error) {
|
||||
@@ -210,6 +212,7 @@ func New(config Config) (Server, error) {
|
||||
token: config.Token,
|
||||
passphrase: config.Passphrase,
|
||||
collector: config.Collector,
|
||||
iam: config.IAM,
|
||||
logger: config.Logger,
|
||||
}
|
||||
|
||||
@@ -537,6 +540,31 @@ func (s *server) handlePublish(conn srt.Conn) {
|
||||
|
||||
si, _ := parseStreamId(streamId)
|
||||
|
||||
identity, err := s.findIdentityFromToken(si.token)
|
||||
if err != nil {
|
||||
s.logger.Debug().WithError(err).Log("no valid identity found")
|
||||
s.log("PUBLISH", "FORBIDDEN", si.resource, "invalid token", client)
|
||||
return
|
||||
}
|
||||
|
||||
domain := s.findDomainFromPlaypath(si.resource)
|
||||
resource := "srt:" + si.resource
|
||||
|
||||
l := s.logger.Debug().WithFields(log.Fields{
|
||||
"name": identity.Name(),
|
||||
"domain": domain,
|
||||
"resource": resource,
|
||||
"action": "PUBLISH",
|
||||
})
|
||||
|
||||
if ok, rule := s.iam.Enforce(identity.Name(), domain, resource, "PUBLISH"); !ok {
|
||||
l.Log("access denied")
|
||||
s.log("PUBLISH", "FORBIDDEN", si.resource, "invalid token", client)
|
||||
return
|
||||
} else {
|
||||
l.Log(rule)
|
||||
}
|
||||
|
||||
// Look for the stream
|
||||
s.lock.Lock()
|
||||
ch := s.channels[si.resource]
|
||||
@@ -575,6 +603,31 @@ func (s *server) handleSubscribe(conn srt.Conn) {
|
||||
|
||||
si, _ := parseStreamId(streamId)
|
||||
|
||||
identity, err := s.findIdentityFromToken(si.token)
|
||||
if err != nil {
|
||||
s.logger.Debug().WithError(err).Log("no valid identity found")
|
||||
s.log("SUBSCRIBE", "FORBIDDEN", si.resource, "invalid token", client)
|
||||
return
|
||||
}
|
||||
|
||||
domain := s.findDomainFromPlaypath(si.resource)
|
||||
resource := "srt:" + si.resource
|
||||
|
||||
l := s.logger.Debug().WithFields(log.Fields{
|
||||
"name": identity.Name(),
|
||||
"domain": domain,
|
||||
"resource": resource,
|
||||
"action": "PLAY",
|
||||
})
|
||||
|
||||
if ok, rule := s.iam.Enforce(identity.Name(), domain, resource, "PLAY"); !ok {
|
||||
l.Log("access denied")
|
||||
s.log("SUBSCRIBE", "FORBIDDEN", si.resource, "invalid token", client)
|
||||
return
|
||||
} else {
|
||||
l.Log(rule)
|
||||
}
|
||||
|
||||
// Look for the stream
|
||||
s.lock.RLock()
|
||||
ch := s.channels[si.resource]
|
||||
@@ -598,3 +651,40 @@ func (s *server) handleSubscribe(conn srt.Conn) {
|
||||
|
||||
conn.Close()
|
||||
}
|
||||
|
||||
func (s *server) findIdentityFromToken(key string) (iam.IdentityVerifier, error) {
|
||||
var identity iam.IdentityVerifier
|
||||
var err error
|
||||
|
||||
elements := strings.Split(key, ":")
|
||||
if len(elements) == 1 {
|
||||
identity, err = s.iam.GetDefaultIdentity()
|
||||
} else {
|
||||
identity, err = s.iam.GetIdentity(elements[0])
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid token: %w", err)
|
||||
}
|
||||
|
||||
if ok, err := identity.VerifyServiceToken(elements[1]); !ok {
|
||||
return nil, fmt.Errorf("invalid token: %w", err)
|
||||
}
|
||||
|
||||
return identity, nil
|
||||
}
|
||||
|
||||
func (s *server) findDomainFromPlaypath(path string) string {
|
||||
elements := strings.Split(path, "/")
|
||||
if len(elements) == 1 {
|
||||
return ""
|
||||
}
|
||||
|
||||
domain := elements[0]
|
||||
|
||||
if s.iam.IsDomain(domain) {
|
||||
return domain
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user