Use IAM for placeholders, simplify proxy nodes

This commit is contained in:
Ingo Oppermann
2023-05-31 21:49:25 +02:00
parent 401156e4d2
commit f21ca2055e
6 changed files with 198 additions and 137 deletions

View File

@@ -799,15 +799,60 @@ func (a *api) start() error {
}, nil) }, nil)
a.replacer.RegisterTemplateFunc("memfs", func(config *restreamapp.Config, section string) string { a.replacer.RegisterTemplateFunc("memfs", func(config *restreamapp.Config, section string) string {
return a.memfs.Metadata("base") u, _ := url.Parse(a.memfs.Metadata("base"))
var identity iamidentity.Verifier = nil
if len(config.Owner) == 0 {
identity = a.iam.GetDefaultVerifier()
} else {
identity, _ = a.iam.GetVerifier(config.Owner)
}
if identity != nil {
u.User = url.UserPassword(config.Owner, identity.GetServiceBasicAuth())
}
return u.String()
}, nil) }, nil)
a.replacer.RegisterTemplateFunc("fs:mem", func(config *restreamapp.Config, section string) string { a.replacer.RegisterTemplateFunc("fs:mem", func(config *restreamapp.Config, section string) string {
return a.memfs.Metadata("base") u, _ := url.Parse(a.memfs.Metadata("base"))
var identity iamidentity.Verifier = nil
if len(config.Owner) == 0 {
identity = a.iam.GetDefaultVerifier()
} else {
identity, _ = a.iam.GetVerifier(config.Owner)
}
if identity != nil {
u.User = url.UserPassword(config.Owner, identity.GetServiceBasicAuth())
}
return u.String()
}, nil) }, nil)
for name, s3 := range a.s3fs { for name, s3 := range a.s3fs {
a.replacer.RegisterTemplate("fs:"+name, s3.Metadata("base"), nil) s3 := s3
a.replacer.RegisterTemplateFunc("fs:"+name, func(config *restreamapp.Config, section string) string {
u, _ := url.Parse(s3.Metadata("base"))
var identity iamidentity.Verifier = nil
if len(config.Owner) == 0 {
identity = a.iam.GetDefaultVerifier()
} else {
identity, _ = a.iam.GetVerifier(config.Owner)
}
if identity != nil {
u.User = url.UserPassword(config.Owner, identity.GetServiceBasicAuth())
}
return u.String()
}, nil)
} }
a.replacer.RegisterTemplateFunc("rtmp", func(config *restreamapp.Config, section string) string { a.replacer.RegisterTemplateFunc("rtmp", func(config *restreamapp.Config, section string) string {

View File

@@ -7,7 +7,6 @@ import (
"net" "net"
"net/http" "net/http"
"net/url" "net/url"
"path/filepath"
"strings" "strings"
"sync" "sync"
"time" "time"
@@ -24,8 +23,8 @@ type Node interface {
StartFiles(updates chan<- NodeFiles) error StartFiles(updates chan<- NodeFiles) error
StopFiles() StopFiles()
GetURL(path string) (string, error) GetURL(prefix, path string) (*url.URL, error)
GetFile(path string) (io.ReadCloser, error) GetFile(prefix, path string) (io.ReadCloser, error)
ProcessAdd(*app.Config) error ProcessAdd(*app.Config) error
ProcessStart(id string) error ProcessStart(id string) error
@@ -120,15 +119,12 @@ type node struct {
runningLock sync.Mutex runningLock sync.Mutex
running bool running bool
host string
secure bool secure bool
httpAddress *url.URL
hasRTMP bool hasRTMP bool
rtmpAddress string rtmpAddress *url.URL
rtmpToken string
hasSRT bool hasSRT bool
srtAddress string srtAddress *url.URL
srtPassphrase string
srtToken string
} }
func NewNode(address string) Node { func NewNode(address string) Node {
@@ -189,43 +185,41 @@ func (n *node) Connect() error {
return fmt.Errorf("failed to convert config to expected version") return fmt.Errorf("failed to convert config to expected version")
} }
n.httpAddress = u
if config.RTMP.Enable { if config.RTMP.Enable {
n.hasRTMP = true n.hasRTMP = true
n.rtmpAddress = "rtmp://" n.rtmpAddress = &url.URL{}
n.rtmpAddress.Scheme = "rtmp:"
isHostIP := net.ParseIP(host) != nil isHostIP := net.ParseIP(host) != nil
address := config.RTMP.Address address := config.RTMP.Address
if n.secure && config.RTMP.EnableTLS && !isHostIP { if n.secure && config.RTMP.EnableTLS && !isHostIP {
address = config.RTMP.AddressTLS address = config.RTMP.AddressTLS
n.rtmpAddress = "rtmps://" n.rtmpAddress.Scheme = "rtmps:"
} }
_, port, err := net.SplitHostPort(address) n.rtmpAddress.Host = address
if err != nil {
n.hasRTMP = false
} else {
n.rtmpAddress += host + ":" + port
n.rtmpToken = config.RTMP.Token
}
} }
if config.SRT.Enable { if config.SRT.Enable {
n.hasSRT = true n.hasSRT = true
n.srtAddress = "srt://" n.srtAddress = &url.URL{}
n.srtAddress.Scheme = "srt:"
n.srtAddress.Host = config.SRT.Address
_, port, err := net.SplitHostPort(config.SRT.Address) v := url.Values{}
if err != nil {
n.hasSRT = false v.Set("mode", "caller")
} else { if len(config.SRT.Passphrase) != 0 {
n.srtAddress += host + ":" + port v.Set("passphrase", config.SRT.Passphrase)
n.srtPassphrase = config.SRT.Passphrase
n.srtToken = config.SRT.Token
} }
n.srtAddress.RawQuery = v.Encode()
} }
n.ips = addrs n.ips = addrs
n.host = host
n.peer = peer n.peer = peer
@@ -627,46 +621,57 @@ func (n *node) files() {
n.stateLock.Unlock() n.stateLock.Unlock()
} }
func (n *node) GetURL(path string) (string, error) { func cloneURL(src *url.URL) *url.URL {
prefix, path, found := strings.Cut(path, ":") dst := &url.URL{
if !found { Scheme: src.Scheme,
return "", fmt.Errorf("no prefix provided") Opaque: src.Opaque,
User: nil,
Host: src.Host,
Path: src.Path,
RawPath: src.RawPath,
OmitHost: src.OmitHost,
ForceQuery: src.ForceQuery,
RawQuery: src.RawQuery,
Fragment: src.Fragment,
RawFragment: src.RawFragment,
} }
u := "" if src.User != nil {
username := src.User.Username()
password, ok := src.User.Password()
if ok {
dst.User = url.UserPassword(username, password)
} else {
dst.User = url.User(username)
}
}
return dst
}
func (n *node) GetURL(prefix, path string) (*url.URL, error) {
var u *url.URL
if prefix == "mem" { if prefix == "mem" {
u = n.address + "/" + filepath.Join("memfs", path) u = cloneURL(n.httpAddress)
u.JoinPath("memfs", path)
} else if prefix == "disk" { } else if prefix == "disk" {
u = n.address + path u = cloneURL(n.httpAddress)
u.JoinPath(path)
} else if prefix == "rtmp" { } else if prefix == "rtmp" {
u = n.rtmpAddress + path u = cloneURL(n.rtmpAddress)
if len(n.rtmpToken) != 0 { u.JoinPath(path)
u += "?token=" + url.QueryEscape(n.rtmpToken)
}
} else if prefix == "srt" { } else if prefix == "srt" {
u = n.srtAddress + "?mode=caller" u = cloneURL(n.srtAddress)
if len(n.srtPassphrase) != 0 {
u += "&passphrase=" + url.QueryEscape(n.srtPassphrase)
}
streamid := "#!:m=request,r=" + path
if len(n.srtToken) != 0 {
streamid += ",token=" + n.srtToken
}
u += "&streamid=" + url.QueryEscape(streamid)
} else { } else {
return "", fmt.Errorf("unknown prefix") return nil, fmt.Errorf("unknown prefix")
} }
return u, nil return u, nil
} }
func (n *node) GetFile(path string) (io.ReadCloser, error) { func (n *node) GetFile(prefix, path string) (io.ReadCloser, error) {
prefix, path, found := strings.Cut(path, ":")
if !found {
return nil, fmt.Errorf("no prefix provided")
}
n.peerLock.RLock() n.peerLock.RLock()
defer n.peerLock.RUnlock() defer n.peerLock.RUnlock()
@@ -674,13 +679,7 @@ func (n *node) GetFile(path string) (io.ReadCloser, error) {
return nil, fmt.Errorf("not connected") return nil, fmt.Errorf("not connected")
} }
if prefix == "mem" { return n.peer.FilesystemGetFile(prefix, path)
return n.peer.MemFSGetFile(path)
} else if prefix == "disk" {
return n.peer.DiskFSGetFile(path)
}
return nil, fmt.Errorf("unknown prefix")
} }
func (n *node) ProcessList() ([]Process, error) { func (n *node) ProcessList() ([]Process, error) {

View File

@@ -5,6 +5,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"net/url"
"sync" "sync"
"time" "time"
@@ -36,8 +37,8 @@ type ProxyReader interface {
Resources() map[string]NodeResources Resources() map[string]NodeResources
ListProcesses() []Process ListProcesses() []Process
GetURL(path string) (string, error) GetURL(prefix, path string) (*url.URL, error)
GetFile(path string) (io.ReadCloser, error) GetFile(prefix, path string) (io.ReadCloser, error)
} }
func NewNullProxyReader() ProxyReader { func NewNullProxyReader() ProxyReader {
@@ -80,20 +81,20 @@ func (p *proxyReader) ListProcesses() []Process {
return p.proxy.ListProcesses() return p.proxy.ListProcesses()
} }
func (p *proxyReader) GetURL(path string) (string, error) { func (p *proxyReader) GetURL(prefix, path string) (*url.URL, error) {
if p.proxy == nil {
return "", fmt.Errorf("no proxy provided")
}
return p.proxy.GetURL(path)
}
func (p *proxyReader) GetFile(path string) (io.ReadCloser, error) {
if p.proxy == nil { if p.proxy == nil {
return nil, fmt.Errorf("no proxy provided") return nil, fmt.Errorf("no proxy provided")
} }
return p.proxy.GetFile(path) return p.proxy.GetURL(prefix, path)
}
func (p *proxyReader) GetFile(prefix, path string) (io.ReadCloser, error) {
if p.proxy == nil {
return nil, fmt.Errorf("no proxy provided")
}
return p.proxy.GetFile(prefix, path)
} }
type ProxyConfig struct { type ProxyConfig struct {
@@ -337,78 +338,88 @@ func (p *proxy) GetNode(id string) (NodeReader, error) {
return node, nil return node, nil
} }
func (c *proxy) GetURL(path string) (string, error) { func (p *proxy) GetURL(prefix, path string) (*url.URL, error) {
c.lock.RLock()
defer c.lock.RUnlock()
id, ok := c.fileid[path]
if !ok {
c.logger.Debug().WithField("path", path).Log("Not found")
return "", fmt.Errorf("file not found")
}
ts, ok := c.idupdate[id]
if !ok {
c.logger.Debug().WithField("path", path).Log("No age information found")
return "", fmt.Errorf("file not found")
}
if time.Since(ts) > 2*time.Second {
c.logger.Debug().WithField("path", path).Log("File too old")
return "", fmt.Errorf("file not found")
}
node, ok := c.nodes[id]
if !ok {
c.logger.Debug().WithField("path", path).Log("Unknown node")
return "", fmt.Errorf("file not found")
}
url, err := node.GetURL(path)
if err != nil {
c.logger.Debug().WithField("path", path).Log("Invalid path")
return "", fmt.Errorf("file not found")
}
c.logger.Debug().WithField("url", url).Log("File cluster url")
return url, nil
}
func (p *proxy) GetFile(path string) (io.ReadCloser, error) {
p.lock.RLock() p.lock.RLock()
defer p.lock.RUnlock() defer p.lock.RUnlock()
id, ok := p.fileid[path] logger := p.logger.WithFields(log.Fields{
"path": path,
"prefix": prefix,
})
id, ok := p.fileid[prefix+":"+path]
if !ok { if !ok {
p.logger.Debug().WithField("path", path).Log("Not found") logger.Debug().Log("Not found")
return nil, fmt.Errorf("file not found") return nil, fmt.Errorf("file not found")
} }
ts, ok := p.idupdate[id] ts, ok := p.idupdate[id]
if !ok { if !ok {
p.logger.Debug().WithField("path", path).Log("No age information found") logger.Debug().Log("No age information found")
return nil, fmt.Errorf("file not found") return nil, fmt.Errorf("file not found")
} }
if time.Since(ts) > 2*time.Second { if time.Since(ts) > 2*time.Second {
p.logger.Debug().WithField("path", path).Log("File too old") logger.Debug().Log("File too old")
return nil, fmt.Errorf("file not found") return nil, fmt.Errorf("file not found")
} }
node, ok := p.nodes[id] node, ok := p.nodes[id]
if !ok { if !ok {
p.logger.Debug().WithField("path", path).Log("Unknown node") logger.Debug().Log("Unknown node")
return nil, fmt.Errorf("file not found") return nil, fmt.Errorf("file not found")
} }
data, err := node.GetFile(path) url, err := node.GetURL(prefix, path)
if err != nil { if err != nil {
p.logger.Debug().WithField("path", path).Log("Invalid path") logger.Debug().Log("Invalid path")
return nil, fmt.Errorf("file not found") return nil, fmt.Errorf("file not found")
} }
p.logger.Debug().WithField("path", path).Log("File cluster path") logger.Debug().WithField("url", url).Log("File cluster url")
return url, nil
}
func (p *proxy) GetFile(prefix, path string) (io.ReadCloser, error) {
p.lock.RLock()
defer p.lock.RUnlock()
logger := p.logger.WithFields(log.Fields{
"path": path,
"prefix": prefix,
})
id, ok := p.fileid[prefix+":"+path]
if !ok {
logger.Debug().Log("Not found")
return nil, fmt.Errorf("file not found")
}
ts, ok := p.idupdate[id]
if !ok {
logger.Debug().Log("No age information found")
return nil, fmt.Errorf("file not found")
}
if time.Since(ts) > 2*time.Second {
logger.Debug().Log("File too old")
return nil, fmt.Errorf("file not found")
}
node, ok := p.nodes[id]
if !ok {
logger.Debug().Log("Unknown node")
return nil, fmt.Errorf("file not found")
}
data, err := node.GetFile(prefix, path)
if err != nil {
logger.Debug().Log("Invalid path")
return nil, fmt.Errorf("file not found")
}
logger.Debug().Log("File cluster path")
return data, nil return data, nil
} }

View File

@@ -41,7 +41,7 @@ func (fs *filesystem) Open(path string) fs.File {
} }
// Check if the file is available in the cluster // Check if the file is available in the cluster
data, err := fs.proxy.GetFile(fs.name + ":" + path) data, err := fs.proxy.GetFile(fs.name, path)
if err != nil { if err != nil {
return nil return nil
} }

View File

@@ -277,13 +277,16 @@ func (s *server) handlePlay(conn *rtmp.Conn) {
if ch == nil { if ch == nil {
// Check in the cluster for that stream // Check in the cluster for that stream
url, err := s.proxy.GetURL("rtmp:" + conn.URL.Path) url, err := s.proxy.GetURL("rtmp", conn.URL.Path)
if err != nil { if err != nil {
s.log(identity, "PLAY", "NOTFOUND", conn.URL.Path, "", remote) s.log(identity, "PLAY", "NOTFOUND", conn.URL.Path, "", remote)
return return
} }
src, err := avutil.Open(url) url.JoinPath(token)
peerurl := url.String()
src, err := avutil.Open(peerurl)
if err != nil { if err != nil {
s.logger.Error().WithField("address", url).WithError(err).Log("Proxying address failed") s.logger.Error().WithField("address", url).WithError(err).Log("Proxying address failed")
s.log(identity, "PLAY", "NOTFOUND", conn.URL.Path, "", remote) s.log(identity, "PLAY", "NOTFOUND", conn.URL.Path, "", remote)
@@ -296,13 +299,13 @@ func (s *server) handlePlay(conn *rtmp.Conn) {
wg.Add(1) wg.Add(1)
go func() { go func() {
s.log(identity, "PLAY", "PROXYSTART", url, "", remote) s.log(identity, "PLAY", "PROXYSTART", peerurl, "", remote)
wg.Done() wg.Done()
err := s.publish(c, playpath, remote, identity, true) err := s.publish(c, playpath, remote, identity, true)
if err != nil { if err != nil {
s.logger.Error().WithField("address", url).WithError(err).Log("Proxying address failed") s.logger.Error().WithField("address", url).WithError(err).Log("Proxying address failed")
} }
s.log(identity, "PLAY", "PROXYSTOP", url, "", remote) s.log(identity, "PLAY", "PROXYSTOP", peerurl, "", remote)
}() }()
// Wait for the goroutine to start // Wait for the goroutine to start

View File

@@ -401,23 +401,26 @@ func (s *server) handleSubscribe(conn srt.Conn) {
if ch == nil { if ch == nil {
// Check in the cluster for the stream and proxy it // Check in the cluster for the stream and proxy it
srturl, err := s.proxy.GetURL("srt:" + si.Resource) srturl, err := s.proxy.GetURL("srt", si.Resource)
if err != nil { if err != nil {
s.log("SUBSCRIBE", "NOTFOUND", si.Resource, "no publisher for this resource found", client) s.log("SUBSCRIBE", "NOTFOUND", si.Resource, "no publisher for this resource found", client)
return return
} }
peerurl := srturl.String()
config := srt.DefaultConfig() config := srt.DefaultConfig()
config.StreamId = streamId
config.Latency = 200 * time.Millisecond // This might be a value obtained from the cluster config.Latency = 200 * time.Millisecond // This might be a value obtained from the cluster
host, err := config.UnmarshalURL(srturl) host, err := config.UnmarshalURL(peerurl)
if err != nil { if err != nil {
s.logger.Error().WithField("address", srturl).WithError(err).Log("Parsing proxy address failed") s.logger.Error().WithField("address", peerurl).WithError(err).Log("Parsing proxy address failed")
s.log("SUBSCRIBE", "NOTFOUND", si.Resource, "no publisher for this resource found", client) s.log("SUBSCRIBE", "NOTFOUND", si.Resource, "no publisher for this resource found", client)
return return
} }
src, err := srt.Dial("srt", host, config) src, err := srt.Dial("srt", host, config)
if err != nil { if err != nil {
s.logger.Error().WithField("address", srturl).WithError(err).Log("Proxying address failed") s.logger.Error().WithField("address", peerurl).WithError(err).Log("Proxying address failed")
s.log("SUBSCRIBE", "NOTFOUND", si.Resource, "no publisher for this resource found", client) s.log("SUBSCRIBE", "NOTFOUND", si.Resource, "no publisher for this resource found", client)
return return
} }
@@ -426,13 +429,13 @@ func (s *server) handleSubscribe(conn srt.Conn) {
wg.Add(1) wg.Add(1)
go func() { go func() {
s.log("SUBSCRIBE", "PROXYSTART", srturl, "", client) s.log("SUBSCRIBE", "PROXYSTART", peerurl, "", client)
wg.Done() wg.Done()
err := s.publish(src, true) err := s.publish(src, true)
if err != nil { if err != nil {
s.logger.Error().WithField("address", srturl).WithError(err).Log("Proxying address failed") s.logger.Error().WithField("address", srturl).WithError(err).Log("Proxying address failed")
} }
s.log("SUBSCRIBE", "PROXYPUBLISHSTOP", srturl, "", client) s.log("SUBSCRIBE", "PROXYPUBLISHSTOP", peerurl, "", client)
}() }()
// Wait for the goroutine to start // Wait for the goroutine to start