From 9462ccfd4832e8921f2096dfcae281051b5b95c9 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Mon, 10 Jul 2023 13:20:24 +0200 Subject: [PATCH] Move IAM dependency to rewriter module --- app/api/api.go | 2 +- app/import/import.go | 27 ------------ http/mock/mock.go | 28 ------------- restream/restream.go | 23 +++-------- restream/restream_test.go | 5 ++- restream/rewrite/rewrite.go | 14 ++++++- restream/rewrite/rewrite_test.go | 70 ++++++++++++++++---------------- 7 files changed, 58 insertions(+), 111 deletions(-) diff --git a/app/api/api.go b/app/api/api.go index 1bedaf3f..f55f0368 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -884,6 +884,7 @@ func (a *api) start(ctx context.Context) error { HTTPBase: "http://" + httpBase, RTMPBase: "rtmp://" + rtmpBase, SRTBase: "srt://" + srtBase, + IAM: a.iam, }) if err != nil { return fmt.Errorf("unable to create url rewriter: %w", err) @@ -1153,7 +1154,6 @@ func (a *api) start(ctx context.Context) error { FFmpeg: a.ffmpeg, MaxProcesses: cfg.FFmpeg.MaxProcesses, Resources: a.resources, - IAM: a.iam, Logger: a.log.logger.core.WithComponent("Process"), }) diff --git a/app/import/import.go b/app/import/import.go index 6905f4bf..1e291a82 100644 --- a/app/import/import.go +++ b/app/import/import.go @@ -17,9 +17,6 @@ import ( "github.com/datarhei/core/v16/encoding/json" "github.com/datarhei/core/v16/ffmpeg" "github.com/datarhei/core/v16/ffmpeg/skills" - "github.com/datarhei/core/v16/iam" - iamaccess "github.com/datarhei/core/v16/iam/access" - iamidentity "github.com/datarhei/core/v16/iam/identity" "github.com/datarhei/core/v16/io/fs" "github.com/datarhei/core/v16/restream" "github.com/datarhei/core/v16/restream/app" @@ -1459,33 +1456,9 @@ func probeInput(binary string, config app.Config) app.Probe { return app.Probe{} } - policyAdapter, err := iamaccess.NewJSONAdapter(dummyfs, "./policy.json", nil) - if err != nil { - return app.Probe{} - } - - identityAdapter, err := iamidentity.NewJSONAdapter(dummyfs, "./users.json", nil) - if err != nil { - return app.Probe{} - } - - iam, _ := iam.New(iam.Config{ - PolicyAdapter: policyAdapter, - IdentityAdapter: identityAdapter, - Superuser: iamidentity.User{ - Name: "foobar", - }, - JWTRealm: "", - JWTSecret: "", - Logger: nil, - }) - - iam.AddPolicy("$anon", "$none", "process:*", []string{"CREATE", "GET", "DELETE", "PROBE"}) - rs, err := restream.New(restream.Config{ FFmpeg: ffmpeg, Store: store, - IAM: iam, }) if err != nil { return app.Probe{} diff --git a/http/mock/mock.go b/http/mock/mock.go index db7eece5..566d548a 100644 --- a/http/mock/mock.go +++ b/http/mock/mock.go @@ -16,9 +16,6 @@ import ( "github.com/datarhei/core/v16/http/api" "github.com/datarhei/core/v16/http/errorhandler" "github.com/datarhei/core/v16/http/validator" - "github.com/datarhei/core/v16/iam" - iamaccess "github.com/datarhei/core/v16/iam/access" - iamidentity "github.com/datarhei/core/v16/iam/identity" "github.com/datarhei/core/v16/internal/testhelper" "github.com/datarhei/core/v16/io/fs" "github.com/datarhei/core/v16/restream" @@ -57,35 +54,10 @@ func DummyRestreamer(pathPrefix string) (restream.Restreamer, error) { return nil, err } - policyAdapter, err := iamaccess.NewJSONAdapter(memfs, "./policy.json", nil) - if err != nil { - return nil, err - } - - identityAdapter, err := iamidentity.NewJSONAdapter(memfs, "./users.json", nil) - if err != nil { - return nil, err - } - - iam, err := iam.New(iam.Config{ - PolicyAdapter: policyAdapter, - IdentityAdapter: identityAdapter, - Superuser: iamidentity.User{ - Name: "foobar", - }, - JWTRealm: "", - JWTSecret: "", - Logger: nil, - }) - if err != nil { - return nil, err - } - rs, err := restream.New(restream.Config{ Store: store, FFmpeg: ffmpeg, Filesystems: []fs.Filesystem{memfs}, - IAM: iam, }) if err != nil { return nil, err diff --git a/restream/restream.go b/restream/restream.go index b270e081..37301ef5 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -16,7 +16,6 @@ import ( "github.com/datarhei/core/v16/ffmpeg/probe" "github.com/datarhei/core/v16/ffmpeg/skills" "github.com/datarhei/core/v16/glob" - "github.com/datarhei/core/v16/iam" "github.com/datarhei/core/v16/io/fs" "github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/net" @@ -77,7 +76,6 @@ type Config struct { MaxProcesses int64 Resources resources.Resources Logger log.Logger - IAM iam.IAM } type task struct { @@ -134,8 +132,6 @@ type restream struct { startOnce sync.Once stopOnce sync.Once - - iam iam.IAM } // New returns a new instance that implements the Restreamer interface @@ -148,17 +144,12 @@ func New(config Config) (Restreamer, error) { replace: config.Replace, rewrite: config.Rewrite, logger: config.Logger, - iam: config.IAM, } if r.logger == nil { r.logger = log.New("") } - if r.iam == nil { - return nil, fmt.Errorf("missing IAM") - } - if r.store == nil { dummyfs, _ := fs.NewMemFilesystem(fs.MemConfig{}) s, err := jsonstore.New(jsonstore.Config{ @@ -1059,8 +1050,6 @@ func (r *restream) resolveAddress(tasks map[app.ProcessID]*task, id, address str return address, fmt.Errorf("unknown process '%s' in domain '%s' (%s)", matches["id"], matches["domain"], address) } - identity, _ := r.iam.GetVerifier(t.config.Owner) - teeOptions := regexp.MustCompile(`^\[[^\]]*\]`) for _, x := range t.config.Output { @@ -1070,7 +1059,7 @@ func (r *restream) resolveAddress(tasks map[app.ProcessID]*task, id, address str // Check for non-tee output if !strings.Contains(x.Address, "|") && !strings.HasPrefix(x.Address, "[") { - return r.rewrite.RewriteAddress(x.Address, identity, rewrite.READ), nil + return r.rewrite.RewriteAddress(x.Address, t.config.Owner, rewrite.READ), nil } // Split tee output in its individual addresses @@ -1086,7 +1075,7 @@ func (r *restream) resolveAddress(tasks map[app.ProcessID]*task, id, address str } if len(matches["source"]) == 0 { - return r.rewrite.RewriteAddress(addresses[0], identity, rewrite.READ), nil + return r.rewrite.RewriteAddress(addresses[0], t.config.Owner, rewrite.READ), nil } for _, a := range addresses { @@ -1098,21 +1087,21 @@ func (r *restream) resolveAddress(tasks map[app.ProcessID]*task, id, address str if matches["source"] == "hls" { if (u.Scheme == "http" || u.Scheme == "https") && strings.HasSuffix(u.RawPath, ".m3u8") { - return r.rewrite.RewriteAddress(a, identity, rewrite.READ), nil + return r.rewrite.RewriteAddress(a, t.config.Owner, rewrite.READ), nil } } else if matches["source"] == "rtmp" { if u.Scheme == "rtmp" { - return r.rewrite.RewriteAddress(a, identity, rewrite.READ), nil + return r.rewrite.RewriteAddress(a, t.config.Owner, rewrite.READ), nil } } else if matches["source"] == "srt" { if u.Scheme == "srt" { - return r.rewrite.RewriteAddress(a, identity, rewrite.READ), nil + return r.rewrite.RewriteAddress(a, t.config.Owner, rewrite.READ), nil } } } // If none of the sources matched, return the first address - return r.rewrite.RewriteAddress(addresses[0], identity, rewrite.READ), nil + return r.rewrite.RewriteAddress(addresses[0], t.config.Owner, rewrite.READ), nil } return address, fmt.Errorf("the process '%s' in group '%s' has no outputs with the ID '%s' (%s)", matches["id"], matches["group"], matches["output"], address) diff --git a/restream/restream_test.go b/restream/restream_test.go index 5726a37d..ecf5cdf6 100644 --- a/restream/restream_test.go +++ b/restream/restream_test.go @@ -71,7 +71,9 @@ func getDummyRestreamer(portrange net.Portranger, validatorIn, validatorOut ffmp iam.AddPolicy("$anon", "$none", "process:*", []string{"CREATE", "GET", "DELETE", "UPDATE", "COMMAND", "PROBE", "METADATA", "PLAYOUT"}) - rewriter, err := rewrite.New(rewrite.Config{}) + rewriter, err := rewrite.New(rewrite.Config{ + IAM: iam, + }) if err != nil { return nil, err } @@ -81,7 +83,6 @@ func getDummyRestreamer(portrange net.Portranger, validatorIn, validatorOut ffmp Replace: replacer, Filesystems: []fs.Filesystem{memfs}, Rewrite: rewriter, - IAM: iam, }) if err != nil { return nil, err diff --git a/restream/rewrite/rewrite.go b/restream/rewrite/rewrite.go index fd598c3d..a40cabf3 100644 --- a/restream/rewrite/rewrite.go +++ b/restream/rewrite/rewrite.go @@ -5,6 +5,7 @@ import ( "fmt" "net/url" + "github.com/datarhei/core/v16/iam" iamidentity "github.com/datarhei/core/v16/iam/identity" "github.com/datarhei/core/v16/rtmp" srturl "github.com/datarhei/core/v16/srt/url" @@ -21,17 +22,19 @@ type Config struct { HTTPBase string RTMPBase string SRTBase string + IAM iam.IAM } // to a new identity, i.e. adjusting the credentials to the given identity. type Rewriter interface { - RewriteAddress(address string, identity iamidentity.Verifier, mode Access) string + RewriteAddress(address, user string, mode Access) string } type rewrite struct { httpBase string rtmpBase string srtBase string + iam iam.IAM } func New(config Config) (Rewriter, error) { @@ -39,12 +42,17 @@ func New(config Config) (Rewriter, error) { httpBase: config.HTTPBase, rtmpBase: config.RTMPBase, srtBase: config.SRTBase, + iam: config.IAM, + } + + if r.iam == nil { + return nil, fmt.Errorf("missing IAM") } return r, nil } -func (g *rewrite) RewriteAddress(address string, identity iamidentity.Verifier, mode Access) string { +func (g *rewrite) RewriteAddress(address, user string, mode Access) string { u, err := url.Parse(address) if err != nil { return address @@ -55,6 +63,8 @@ func (g *rewrite) RewriteAddress(address string, identity iamidentity.Verifier, return address } + identity, _ := g.iam.GetVerifier(user) + if identity == nil { return address } diff --git a/restream/rewrite/rewrite_test.go b/restream/rewrite/rewrite_test.go index 92c2c504..59f26358 100644 --- a/restream/rewrite/rewrite_test.go +++ b/restream/rewrite/rewrite_test.go @@ -4,16 +4,26 @@ import ( "net/url" "testing" + "github.com/datarhei/core/v16/iam" + iamaccess "github.com/datarhei/core/v16/iam/access" iamidentity "github.com/datarhei/core/v16/iam/identity" "github.com/datarhei/core/v16/io/fs" "github.com/stretchr/testify/require" ) -func getIdentityManager(enableBasic bool) (iamidentity.Manager, error) { - dummyfs, _ := fs.NewMemFilesystem(fs.MemConfig{}) +func getIAM(enableBasic bool) (iam.IAM, error) { + memfs, err := fs.NewMemFilesystem(fs.MemConfig{}) + if err != nil { + return nil, err + } - adapter, err := iamidentity.NewJSONAdapter(dummyfs, "./users.json", nil) + policyAdapter, err := iamaccess.NewJSONAdapter(memfs, "./policy.json", nil) + if err != nil { + return nil, err + } + + identityAdapter, err := iamidentity.NewJSONAdapter(memfs, "./users.json", nil) if err != nil { return nil, err } @@ -33,31 +43,32 @@ func getIdentityManager(enableBasic bool) (iamidentity.Manager, error) { superuser.Auth.Services.Basic = []string{"basicauthpassword"} } - im, err := iamidentity.New(iamidentity.Config{ - Adapter: adapter, - Superuser: superuser, - JWTRealm: "", - JWTSecret: "", - Logger: nil, + iam, err := iam.New(iam.Config{ + PolicyAdapter: policyAdapter, + IdentityAdapter: identityAdapter, + Superuser: superuser, + JWTRealm: "", + JWTSecret: "", + Logger: nil, }) + if err != nil { + return nil, err + } - return im, err + return iam, err } func TestRewriteHTTP(t *testing.T) { - im, err := getIdentityManager(false) + iam, err := getIAM(false) require.NoError(t, err) rewrite, err := New(Config{ HTTPBase: "http://localhost:8080/", + IAM: iam, }) require.NoError(t, err) require.NotNil(t, rewrite) - identity, err := im.GetVerifier("foobar") - require.NoError(t, err) - require.NotNil(t, identity) - samples := [][3]string{ {"http://example.com/live/stream.m3u8", "read", "http://example.com/live/stream.m3u8"}, {"http://example.com/live/stream.m3u8", "write", "http://example.com/live/stream.m3u8"}, @@ -70,25 +81,22 @@ func TestRewriteHTTP(t *testing.T) { } for _, e := range samples { - rewritten := rewrite.RewriteAddress(e[0], identity, Access(e[1])) + rewritten := rewrite.RewriteAddress(e[0], "foobar", Access(e[1])) require.Equal(t, e[2], rewritten, "%s %s", e[0], e[1]) } } func TestRewriteHTTPPassword(t *testing.T) { - im, err := getIdentityManager(true) + iam, err := getIAM(true) require.NoError(t, err) rewrite, err := New(Config{ HTTPBase: "http://localhost:8080/", + IAM: iam, }) require.NoError(t, err) require.NotNil(t, rewrite) - identity, err := im.GetVerifier("foobar") - require.NoError(t, err) - require.NotNil(t, identity) - samples := [][3]string{ {"http://example.com/live/stream.m3u8", "read", "http://example.com/live/stream.m3u8"}, {"http://example.com/live/stream.m3u8", "write", "http://example.com/live/stream.m3u8"}, @@ -101,25 +109,22 @@ func TestRewriteHTTPPassword(t *testing.T) { } for _, e := range samples { - rewritten := rewrite.RewriteAddress(e[0], identity, Access(e[1])) + rewritten := rewrite.RewriteAddress(e[0], "foobar", Access(e[1])) require.Equal(t, e[2], rewritten, "%s %s", e[0], e[1]) } } func TestRewriteRTMP(t *testing.T) { - im, err := getIdentityManager(false) + iam, err := getIAM(false) require.NoError(t, err) rewrite, err := New(Config{ RTMPBase: "rtmp://localhost:1935/live", + IAM: iam, }) require.NoError(t, err) require.NotNil(t, rewrite) - identity, err := im.GetVerifier("foobar") - require.NoError(t, err) - require.NotNil(t, identity) - samples := [][3]string{ {"rtmp://example.com/live/stream", "read", "rtmp://example.com/live/stream"}, {"rtmp://example.com/live/stream", "write", "rtmp://example.com/live/stream"}, @@ -130,25 +135,22 @@ func TestRewriteRTMP(t *testing.T) { } for _, e := range samples { - rewritten := rewrite.RewriteAddress(e[0], identity, Access(e[1])) + rewritten := rewrite.RewriteAddress(e[0], "foobar", Access(e[1])) require.Equal(t, e[2], rewritten, "%s %s", e[0], e[1]) } } func TestRewriteSRT(t *testing.T) { - im, err := getIdentityManager(false) + iam, err := getIAM(false) require.NoError(t, err) rewrite, err := New(Config{ SRTBase: "srt://localhost:6000/", + IAM: iam, }) require.NoError(t, err) require.NotNil(t, rewrite) - identity, err := im.GetVerifier("foobar") - require.NoError(t, err) - require.NotNil(t, identity) - samples := [][3]string{ {"srt://example.com/?streamid=stream", "read", "srt://example.com/?streamid=stream"}, {"srt://example.com/?streamid=stream", "write", "srt://example.com/?streamid=stream"}, @@ -161,7 +163,7 @@ func TestRewriteSRT(t *testing.T) { } for _, e := range samples { - rewritten := rewrite.RewriteAddress(e[0], identity, Access(e[1])) + rewritten := rewrite.RewriteAddress(e[0], "foobar", Access(e[1])) require.Equal(t, e[2], rewritten, "%s %s", e[0], e[1]) } }