Move IAM dependency to rewriter module

This commit is contained in:
Ingo Oppermann
2023-07-10 13:20:24 +02:00
parent 34404a76d2
commit 9462ccfd48
7 changed files with 58 additions and 111 deletions

View File

@@ -884,6 +884,7 @@ func (a *api) start(ctx context.Context) error {
HTTPBase: "http://" + httpBase,
RTMPBase: "rtmp://" + rtmpBase,
SRTBase: "srt://" + srtBase,
IAM: a.iam,
})
if err != nil {
return fmt.Errorf("unable to create url rewriter: %w", err)
@@ -1153,7 +1154,6 @@ func (a *api) start(ctx context.Context) error {
FFmpeg: a.ffmpeg,
MaxProcesses: cfg.FFmpeg.MaxProcesses,
Resources: a.resources,
IAM: a.iam,
Logger: a.log.logger.core.WithComponent("Process"),
})

View File

@@ -17,9 +17,6 @@ import (
"github.com/datarhei/core/v16/encoding/json"
"github.com/datarhei/core/v16/ffmpeg"
"github.com/datarhei/core/v16/ffmpeg/skills"
"github.com/datarhei/core/v16/iam"
iamaccess "github.com/datarhei/core/v16/iam/access"
iamidentity "github.com/datarhei/core/v16/iam/identity"
"github.com/datarhei/core/v16/io/fs"
"github.com/datarhei/core/v16/restream"
"github.com/datarhei/core/v16/restream/app"
@@ -1459,33 +1456,9 @@ func probeInput(binary string, config app.Config) app.Probe {
return app.Probe{}
}
policyAdapter, err := iamaccess.NewJSONAdapter(dummyfs, "./policy.json", nil)
if err != nil {
return app.Probe{}
}
identityAdapter, err := iamidentity.NewJSONAdapter(dummyfs, "./users.json", nil)
if err != nil {
return app.Probe{}
}
iam, _ := iam.New(iam.Config{
PolicyAdapter: policyAdapter,
IdentityAdapter: identityAdapter,
Superuser: iamidentity.User{
Name: "foobar",
},
JWTRealm: "",
JWTSecret: "",
Logger: nil,
})
iam.AddPolicy("$anon", "$none", "process:*", []string{"CREATE", "GET", "DELETE", "PROBE"})
rs, err := restream.New(restream.Config{
FFmpeg: ffmpeg,
Store: store,
IAM: iam,
})
if err != nil {
return app.Probe{}

View File

@@ -16,9 +16,6 @@ import (
"github.com/datarhei/core/v16/http/api"
"github.com/datarhei/core/v16/http/errorhandler"
"github.com/datarhei/core/v16/http/validator"
"github.com/datarhei/core/v16/iam"
iamaccess "github.com/datarhei/core/v16/iam/access"
iamidentity "github.com/datarhei/core/v16/iam/identity"
"github.com/datarhei/core/v16/internal/testhelper"
"github.com/datarhei/core/v16/io/fs"
"github.com/datarhei/core/v16/restream"
@@ -57,35 +54,10 @@ func DummyRestreamer(pathPrefix string) (restream.Restreamer, error) {
return nil, err
}
policyAdapter, err := iamaccess.NewJSONAdapter(memfs, "./policy.json", nil)
if err != nil {
return nil, err
}
identityAdapter, err := iamidentity.NewJSONAdapter(memfs, "./users.json", nil)
if err != nil {
return nil, err
}
iam, err := iam.New(iam.Config{
PolicyAdapter: policyAdapter,
IdentityAdapter: identityAdapter,
Superuser: iamidentity.User{
Name: "foobar",
},
JWTRealm: "",
JWTSecret: "",
Logger: nil,
})
if err != nil {
return nil, err
}
rs, err := restream.New(restream.Config{
Store: store,
FFmpeg: ffmpeg,
Filesystems: []fs.Filesystem{memfs},
IAM: iam,
})
if err != nil {
return nil, err

View File

@@ -16,7 +16,6 @@ import (
"github.com/datarhei/core/v16/ffmpeg/probe"
"github.com/datarhei/core/v16/ffmpeg/skills"
"github.com/datarhei/core/v16/glob"
"github.com/datarhei/core/v16/iam"
"github.com/datarhei/core/v16/io/fs"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/net"
@@ -77,7 +76,6 @@ type Config struct {
MaxProcesses int64
Resources resources.Resources
Logger log.Logger
IAM iam.IAM
}
type task struct {
@@ -134,8 +132,6 @@ type restream struct {
startOnce sync.Once
stopOnce sync.Once
iam iam.IAM
}
// New returns a new instance that implements the Restreamer interface
@@ -148,17 +144,12 @@ func New(config Config) (Restreamer, error) {
replace: config.Replace,
rewrite: config.Rewrite,
logger: config.Logger,
iam: config.IAM,
}
if r.logger == nil {
r.logger = log.New("")
}
if r.iam == nil {
return nil, fmt.Errorf("missing IAM")
}
if r.store == nil {
dummyfs, _ := fs.NewMemFilesystem(fs.MemConfig{})
s, err := jsonstore.New(jsonstore.Config{
@@ -1059,8 +1050,6 @@ func (r *restream) resolveAddress(tasks map[app.ProcessID]*task, id, address str
return address, fmt.Errorf("unknown process '%s' in domain '%s' (%s)", matches["id"], matches["domain"], address)
}
identity, _ := r.iam.GetVerifier(t.config.Owner)
teeOptions := regexp.MustCompile(`^\[[^\]]*\]`)
for _, x := range t.config.Output {
@@ -1070,7 +1059,7 @@ func (r *restream) resolveAddress(tasks map[app.ProcessID]*task, id, address str
// Check for non-tee output
if !strings.Contains(x.Address, "|") && !strings.HasPrefix(x.Address, "[") {
return r.rewrite.RewriteAddress(x.Address, identity, rewrite.READ), nil
return r.rewrite.RewriteAddress(x.Address, t.config.Owner, rewrite.READ), nil
}
// Split tee output in its individual addresses
@@ -1086,7 +1075,7 @@ func (r *restream) resolveAddress(tasks map[app.ProcessID]*task, id, address str
}
if len(matches["source"]) == 0 {
return r.rewrite.RewriteAddress(addresses[0], identity, rewrite.READ), nil
return r.rewrite.RewriteAddress(addresses[0], t.config.Owner, rewrite.READ), nil
}
for _, a := range addresses {
@@ -1098,21 +1087,21 @@ func (r *restream) resolveAddress(tasks map[app.ProcessID]*task, id, address str
if matches["source"] == "hls" {
if (u.Scheme == "http" || u.Scheme == "https") && strings.HasSuffix(u.RawPath, ".m3u8") {
return r.rewrite.RewriteAddress(a, identity, rewrite.READ), nil
return r.rewrite.RewriteAddress(a, t.config.Owner, rewrite.READ), nil
}
} else if matches["source"] == "rtmp" {
if u.Scheme == "rtmp" {
return r.rewrite.RewriteAddress(a, identity, rewrite.READ), nil
return r.rewrite.RewriteAddress(a, t.config.Owner, rewrite.READ), nil
}
} else if matches["source"] == "srt" {
if u.Scheme == "srt" {
return r.rewrite.RewriteAddress(a, identity, rewrite.READ), nil
return r.rewrite.RewriteAddress(a, t.config.Owner, rewrite.READ), nil
}
}
}
// If none of the sources matched, return the first address
return r.rewrite.RewriteAddress(addresses[0], identity, rewrite.READ), nil
return r.rewrite.RewriteAddress(addresses[0], t.config.Owner, rewrite.READ), nil
}
return address, fmt.Errorf("the process '%s' in group '%s' has no outputs with the ID '%s' (%s)", matches["id"], matches["group"], matches["output"], address)

View File

@@ -71,7 +71,9 @@ func getDummyRestreamer(portrange net.Portranger, validatorIn, validatorOut ffmp
iam.AddPolicy("$anon", "$none", "process:*", []string{"CREATE", "GET", "DELETE", "UPDATE", "COMMAND", "PROBE", "METADATA", "PLAYOUT"})
rewriter, err := rewrite.New(rewrite.Config{})
rewriter, err := rewrite.New(rewrite.Config{
IAM: iam,
})
if err != nil {
return nil, err
}
@@ -81,7 +83,6 @@ func getDummyRestreamer(portrange net.Portranger, validatorIn, validatorOut ffmp
Replace: replacer,
Filesystems: []fs.Filesystem{memfs},
Rewrite: rewriter,
IAM: iam,
})
if err != nil {
return nil, err

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"net/url"
"github.com/datarhei/core/v16/iam"
iamidentity "github.com/datarhei/core/v16/iam/identity"
"github.com/datarhei/core/v16/rtmp"
srturl "github.com/datarhei/core/v16/srt/url"
@@ -21,17 +22,19 @@ type Config struct {
HTTPBase string
RTMPBase string
SRTBase string
IAM iam.IAM
}
// to a new identity, i.e. adjusting the credentials to the given identity.
type Rewriter interface {
RewriteAddress(address string, identity iamidentity.Verifier, mode Access) string
RewriteAddress(address, user string, mode Access) string
}
type rewrite struct {
httpBase string
rtmpBase string
srtBase string
iam iam.IAM
}
func New(config Config) (Rewriter, error) {
@@ -39,12 +42,17 @@ func New(config Config) (Rewriter, error) {
httpBase: config.HTTPBase,
rtmpBase: config.RTMPBase,
srtBase: config.SRTBase,
iam: config.IAM,
}
if r.iam == nil {
return nil, fmt.Errorf("missing IAM")
}
return r, nil
}
func (g *rewrite) RewriteAddress(address string, identity iamidentity.Verifier, mode Access) string {
func (g *rewrite) RewriteAddress(address, user string, mode Access) string {
u, err := url.Parse(address)
if err != nil {
return address
@@ -55,6 +63,8 @@ func (g *rewrite) RewriteAddress(address string, identity iamidentity.Verifier,
return address
}
identity, _ := g.iam.GetVerifier(user)
if identity == nil {
return address
}

View File

@@ -4,16 +4,26 @@ import (
"net/url"
"testing"
"github.com/datarhei/core/v16/iam"
iamaccess "github.com/datarhei/core/v16/iam/access"
iamidentity "github.com/datarhei/core/v16/iam/identity"
"github.com/datarhei/core/v16/io/fs"
"github.com/stretchr/testify/require"
)
func getIdentityManager(enableBasic bool) (iamidentity.Manager, error) {
dummyfs, _ := fs.NewMemFilesystem(fs.MemConfig{})
func getIAM(enableBasic bool) (iam.IAM, error) {
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
if err != nil {
return nil, err
}
adapter, err := iamidentity.NewJSONAdapter(dummyfs, "./users.json", nil)
policyAdapter, err := iamaccess.NewJSONAdapter(memfs, "./policy.json", nil)
if err != nil {
return nil, err
}
identityAdapter, err := iamidentity.NewJSONAdapter(memfs, "./users.json", nil)
if err != nil {
return nil, err
}
@@ -33,31 +43,32 @@ func getIdentityManager(enableBasic bool) (iamidentity.Manager, error) {
superuser.Auth.Services.Basic = []string{"basicauthpassword"}
}
im, err := iamidentity.New(iamidentity.Config{
Adapter: adapter,
Superuser: superuser,
JWTRealm: "",
JWTSecret: "",
Logger: nil,
iam, err := iam.New(iam.Config{
PolicyAdapter: policyAdapter,
IdentityAdapter: identityAdapter,
Superuser: superuser,
JWTRealm: "",
JWTSecret: "",
Logger: nil,
})
if err != nil {
return nil, err
}
return im, err
return iam, err
}
func TestRewriteHTTP(t *testing.T) {
im, err := getIdentityManager(false)
iam, err := getIAM(false)
require.NoError(t, err)
rewrite, err := New(Config{
HTTPBase: "http://localhost:8080/",
IAM: iam,
})
require.NoError(t, err)
require.NotNil(t, rewrite)
identity, err := im.GetVerifier("foobar")
require.NoError(t, err)
require.NotNil(t, identity)
samples := [][3]string{
{"http://example.com/live/stream.m3u8", "read", "http://example.com/live/stream.m3u8"},
{"http://example.com/live/stream.m3u8", "write", "http://example.com/live/stream.m3u8"},
@@ -70,25 +81,22 @@ func TestRewriteHTTP(t *testing.T) {
}
for _, e := range samples {
rewritten := rewrite.RewriteAddress(e[0], identity, Access(e[1]))
rewritten := rewrite.RewriteAddress(e[0], "foobar", Access(e[1]))
require.Equal(t, e[2], rewritten, "%s %s", e[0], e[1])
}
}
func TestRewriteHTTPPassword(t *testing.T) {
im, err := getIdentityManager(true)
iam, err := getIAM(true)
require.NoError(t, err)
rewrite, err := New(Config{
HTTPBase: "http://localhost:8080/",
IAM: iam,
})
require.NoError(t, err)
require.NotNil(t, rewrite)
identity, err := im.GetVerifier("foobar")
require.NoError(t, err)
require.NotNil(t, identity)
samples := [][3]string{
{"http://example.com/live/stream.m3u8", "read", "http://example.com/live/stream.m3u8"},
{"http://example.com/live/stream.m3u8", "write", "http://example.com/live/stream.m3u8"},
@@ -101,25 +109,22 @@ func TestRewriteHTTPPassword(t *testing.T) {
}
for _, e := range samples {
rewritten := rewrite.RewriteAddress(e[0], identity, Access(e[1]))
rewritten := rewrite.RewriteAddress(e[0], "foobar", Access(e[1]))
require.Equal(t, e[2], rewritten, "%s %s", e[0], e[1])
}
}
func TestRewriteRTMP(t *testing.T) {
im, err := getIdentityManager(false)
iam, err := getIAM(false)
require.NoError(t, err)
rewrite, err := New(Config{
RTMPBase: "rtmp://localhost:1935/live",
IAM: iam,
})
require.NoError(t, err)
require.NotNil(t, rewrite)
identity, err := im.GetVerifier("foobar")
require.NoError(t, err)
require.NotNil(t, identity)
samples := [][3]string{
{"rtmp://example.com/live/stream", "read", "rtmp://example.com/live/stream"},
{"rtmp://example.com/live/stream", "write", "rtmp://example.com/live/stream"},
@@ -130,25 +135,22 @@ func TestRewriteRTMP(t *testing.T) {
}
for _, e := range samples {
rewritten := rewrite.RewriteAddress(e[0], identity, Access(e[1]))
rewritten := rewrite.RewriteAddress(e[0], "foobar", Access(e[1]))
require.Equal(t, e[2], rewritten, "%s %s", e[0], e[1])
}
}
func TestRewriteSRT(t *testing.T) {
im, err := getIdentityManager(false)
iam, err := getIAM(false)
require.NoError(t, err)
rewrite, err := New(Config{
SRTBase: "srt://localhost:6000/",
IAM: iam,
})
require.NoError(t, err)
require.NotNil(t, rewrite)
identity, err := im.GetVerifier("foobar")
require.NoError(t, err)
require.NotNil(t, identity)
samples := [][3]string{
{"srt://example.com/?streamid=stream", "read", "srt://example.com/?streamid=stream"},
{"srt://example.com/?streamid=stream", "write", "srt://example.com/?streamid=stream"},
@@ -161,7 +163,7 @@ func TestRewriteSRT(t *testing.T) {
}
for _, e := range samples {
rewritten := rewrite.RewriteAddress(e[0], identity, Access(e[1]))
rewritten := rewrite.RewriteAddress(e[0], "foobar", Access(e[1]))
require.Equal(t, e[2], rewritten, "%s %s", e[0], e[1])
}
}