diff --git a/app/api/api.go b/app/api/api.go index 94df709e..3b1248ab 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -799,15 +799,60 @@ func (a *api) start() error { }, nil) a.replacer.RegisterTemplateFunc("memfs", func(config *restreamapp.Config, section string) string { - return a.memfs.Metadata("base") + u, _ := url.Parse(a.memfs.Metadata("base")) + + var identity iamidentity.Verifier = nil + + if len(config.Owner) == 0 { + identity = a.iam.GetDefaultVerifier() + } else { + identity, _ = a.iam.GetVerifier(config.Owner) + } + + if identity != nil { + u.User = url.UserPassword(config.Owner, identity.GetServiceBasicAuth()) + } + + return u.String() }, nil) a.replacer.RegisterTemplateFunc("fs:mem", func(config *restreamapp.Config, section string) string { - return a.memfs.Metadata("base") + u, _ := url.Parse(a.memfs.Metadata("base")) + + var identity iamidentity.Verifier = nil + + if len(config.Owner) == 0 { + identity = a.iam.GetDefaultVerifier() + } else { + identity, _ = a.iam.GetVerifier(config.Owner) + } + + if identity != nil { + u.User = url.UserPassword(config.Owner, identity.GetServiceBasicAuth()) + } + + return u.String() }, nil) for name, s3 := range a.s3fs { - a.replacer.RegisterTemplate("fs:"+name, s3.Metadata("base"), nil) + s3 := s3 + a.replacer.RegisterTemplateFunc("fs:"+name, func(config *restreamapp.Config, section string) string { + u, _ := url.Parse(s3.Metadata("base")) + + var identity iamidentity.Verifier = nil + + if len(config.Owner) == 0 { + identity = a.iam.GetDefaultVerifier() + } else { + identity, _ = a.iam.GetVerifier(config.Owner) + } + + if identity != nil { + u.User = url.UserPassword(config.Owner, identity.GetServiceBasicAuth()) + } + + return u.String() + }, nil) } a.replacer.RegisterTemplateFunc("rtmp", func(config *restreamapp.Config, section string) string { diff --git a/cluster/proxy/node.go b/cluster/proxy/node.go index 85fe13b2..35d0cb64 100644 --- a/cluster/proxy/node.go +++ b/cluster/proxy/node.go @@ -7,7 +7,6 @@ import ( "net" "net/http" "net/url" - "path/filepath" "strings" "sync" "time" @@ -24,8 +23,8 @@ type Node interface { StartFiles(updates chan<- NodeFiles) error StopFiles() - GetURL(path string) (string, error) - GetFile(path string) (io.ReadCloser, error) + GetURL(prefix, path string) (*url.URL, error) + GetFile(prefix, path string) (io.ReadCloser, error) ProcessAdd(*app.Config) error ProcessStart(id string) error @@ -120,15 +119,12 @@ type node struct { runningLock sync.Mutex running bool - host string - secure bool - hasRTMP bool - rtmpAddress string - rtmpToken string - hasSRT bool - srtAddress string - srtPassphrase string - srtToken string + secure bool + httpAddress *url.URL + hasRTMP bool + rtmpAddress *url.URL + hasSRT bool + srtAddress *url.URL } func NewNode(address string) Node { @@ -189,43 +185,41 @@ func (n *node) Connect() error { return fmt.Errorf("failed to convert config to expected version") } + n.httpAddress = u + if config.RTMP.Enable { n.hasRTMP = true - n.rtmpAddress = "rtmp://" + n.rtmpAddress = &url.URL{} + n.rtmpAddress.Scheme = "rtmp:" isHostIP := net.ParseIP(host) != nil address := config.RTMP.Address if n.secure && config.RTMP.EnableTLS && !isHostIP { address = config.RTMP.AddressTLS - n.rtmpAddress = "rtmps://" + n.rtmpAddress.Scheme = "rtmps:" } - _, port, err := net.SplitHostPort(address) - if err != nil { - n.hasRTMP = false - } else { - n.rtmpAddress += host + ":" + port - n.rtmpToken = config.RTMP.Token - } + n.rtmpAddress.Host = address } if config.SRT.Enable { n.hasSRT = true - n.srtAddress = "srt://" + n.srtAddress = &url.URL{} + n.srtAddress.Scheme = "srt:" + n.srtAddress.Host = config.SRT.Address - _, port, err := net.SplitHostPort(config.SRT.Address) - if err != nil { - n.hasSRT = false - } else { - n.srtAddress += host + ":" + port - n.srtPassphrase = config.SRT.Passphrase - n.srtToken = config.SRT.Token + v := url.Values{} + + v.Set("mode", "caller") + if len(config.SRT.Passphrase) != 0 { + v.Set("passphrase", config.SRT.Passphrase) } + + n.srtAddress.RawQuery = v.Encode() } n.ips = addrs - n.host = host n.peer = peer @@ -627,46 +621,57 @@ func (n *node) files() { n.stateLock.Unlock() } -func (n *node) GetURL(path string) (string, error) { - prefix, path, found := strings.Cut(path, ":") - if !found { - return "", fmt.Errorf("no prefix provided") +func cloneURL(src *url.URL) *url.URL { + dst := &url.URL{ + Scheme: src.Scheme, + Opaque: src.Opaque, + User: nil, + Host: src.Host, + Path: src.Path, + RawPath: src.RawPath, + OmitHost: src.OmitHost, + ForceQuery: src.ForceQuery, + RawQuery: src.RawQuery, + Fragment: src.Fragment, + RawFragment: src.RawFragment, } - u := "" + if src.User != nil { + username := src.User.Username() + password, ok := src.User.Password() + + if ok { + dst.User = url.UserPassword(username, password) + } else { + dst.User = url.User(username) + } + } + + return dst +} + +func (n *node) GetURL(prefix, path string) (*url.URL, error) { + var u *url.URL if prefix == "mem" { - u = n.address + "/" + filepath.Join("memfs", path) + u = cloneURL(n.httpAddress) + u.JoinPath("memfs", path) } else if prefix == "disk" { - u = n.address + path + u = cloneURL(n.httpAddress) + u.JoinPath(path) } else if prefix == "rtmp" { - u = n.rtmpAddress + path - if len(n.rtmpToken) != 0 { - u += "?token=" + url.QueryEscape(n.rtmpToken) - } + u = cloneURL(n.rtmpAddress) + u.JoinPath(path) } else if prefix == "srt" { - u = n.srtAddress + "?mode=caller" - if len(n.srtPassphrase) != 0 { - u += "&passphrase=" + url.QueryEscape(n.srtPassphrase) - } - streamid := "#!:m=request,r=" + path - if len(n.srtToken) != 0 { - streamid += ",token=" + n.srtToken - } - u += "&streamid=" + url.QueryEscape(streamid) + u = cloneURL(n.srtAddress) } else { - return "", fmt.Errorf("unknown prefix") + return nil, fmt.Errorf("unknown prefix") } return u, nil } -func (n *node) GetFile(path string) (io.ReadCloser, error) { - prefix, path, found := strings.Cut(path, ":") - if !found { - return nil, fmt.Errorf("no prefix provided") - } - +func (n *node) GetFile(prefix, path string) (io.ReadCloser, error) { n.peerLock.RLock() defer n.peerLock.RUnlock() @@ -674,13 +679,7 @@ func (n *node) GetFile(path string) (io.ReadCloser, error) { return nil, fmt.Errorf("not connected") } - if prefix == "mem" { - return n.peer.MemFSGetFile(path) - } else if prefix == "disk" { - return n.peer.DiskFSGetFile(path) - } - - return nil, fmt.Errorf("unknown prefix") + return n.peer.FilesystemGetFile(prefix, path) } func (n *node) ProcessList() ([]Process, error) { diff --git a/cluster/proxy/proxy.go b/cluster/proxy/proxy.go index 5d879575..b05a9fa8 100644 --- a/cluster/proxy/proxy.go +++ b/cluster/proxy/proxy.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "net/url" "sync" "time" @@ -36,8 +37,8 @@ type ProxyReader interface { Resources() map[string]NodeResources ListProcesses() []Process - GetURL(path string) (string, error) - GetFile(path string) (io.ReadCloser, error) + GetURL(prefix, path string) (*url.URL, error) + GetFile(prefix, path string) (io.ReadCloser, error) } func NewNullProxyReader() ProxyReader { @@ -80,20 +81,20 @@ func (p *proxyReader) ListProcesses() []Process { return p.proxy.ListProcesses() } -func (p *proxyReader) GetURL(path string) (string, error) { - if p.proxy == nil { - return "", fmt.Errorf("no proxy provided") - } - - return p.proxy.GetURL(path) -} - -func (p *proxyReader) GetFile(path string) (io.ReadCloser, error) { +func (p *proxyReader) GetURL(prefix, path string) (*url.URL, error) { if p.proxy == nil { return nil, fmt.Errorf("no proxy provided") } - return p.proxy.GetFile(path) + return p.proxy.GetURL(prefix, path) +} + +func (p *proxyReader) GetFile(prefix, path string) (io.ReadCloser, error) { + if p.proxy == nil { + return nil, fmt.Errorf("no proxy provided") + } + + return p.proxy.GetFile(prefix, path) } type ProxyConfig struct { @@ -337,78 +338,88 @@ func (p *proxy) GetNode(id string) (NodeReader, error) { return node, nil } -func (c *proxy) GetURL(path string) (string, error) { - c.lock.RLock() - defer c.lock.RUnlock() - - id, ok := c.fileid[path] - if !ok { - c.logger.Debug().WithField("path", path).Log("Not found") - return "", fmt.Errorf("file not found") - } - - ts, ok := c.idupdate[id] - if !ok { - c.logger.Debug().WithField("path", path).Log("No age information found") - return "", fmt.Errorf("file not found") - } - - if time.Since(ts) > 2*time.Second { - c.logger.Debug().WithField("path", path).Log("File too old") - return "", fmt.Errorf("file not found") - } - - node, ok := c.nodes[id] - if !ok { - c.logger.Debug().WithField("path", path).Log("Unknown node") - return "", fmt.Errorf("file not found") - } - - url, err := node.GetURL(path) - if err != nil { - c.logger.Debug().WithField("path", path).Log("Invalid path") - return "", fmt.Errorf("file not found") - } - - c.logger.Debug().WithField("url", url).Log("File cluster url") - - return url, nil -} - -func (p *proxy) GetFile(path string) (io.ReadCloser, error) { +func (p *proxy) GetURL(prefix, path string) (*url.URL, error) { p.lock.RLock() defer p.lock.RUnlock() - id, ok := p.fileid[path] + logger := p.logger.WithFields(log.Fields{ + "path": path, + "prefix": prefix, + }) + + id, ok := p.fileid[prefix+":"+path] if !ok { - p.logger.Debug().WithField("path", path).Log("Not found") + logger.Debug().Log("Not found") return nil, fmt.Errorf("file not found") } ts, ok := p.idupdate[id] if !ok { - p.logger.Debug().WithField("path", path).Log("No age information found") + logger.Debug().Log("No age information found") return nil, fmt.Errorf("file not found") } if time.Since(ts) > 2*time.Second { - p.logger.Debug().WithField("path", path).Log("File too old") + logger.Debug().Log("File too old") return nil, fmt.Errorf("file not found") } node, ok := p.nodes[id] if !ok { - p.logger.Debug().WithField("path", path).Log("Unknown node") + logger.Debug().Log("Unknown node") return nil, fmt.Errorf("file not found") } - data, err := node.GetFile(path) + url, err := node.GetURL(prefix, path) if err != nil { - p.logger.Debug().WithField("path", path).Log("Invalid path") + logger.Debug().Log("Invalid path") return nil, fmt.Errorf("file not found") } - p.logger.Debug().WithField("path", path).Log("File cluster path") + logger.Debug().WithField("url", url).Log("File cluster url") + + return url, nil +} + +func (p *proxy) GetFile(prefix, path string) (io.ReadCloser, error) { + p.lock.RLock() + defer p.lock.RUnlock() + + logger := p.logger.WithFields(log.Fields{ + "path": path, + "prefix": prefix, + }) + + id, ok := p.fileid[prefix+":"+path] + if !ok { + logger.Debug().Log("Not found") + return nil, fmt.Errorf("file not found") + } + + ts, ok := p.idupdate[id] + if !ok { + logger.Debug().Log("No age information found") + return nil, fmt.Errorf("file not found") + } + + if time.Since(ts) > 2*time.Second { + logger.Debug().Log("File too old") + return nil, fmt.Errorf("file not found") + } + + node, ok := p.nodes[id] + if !ok { + logger.Debug().Log("Unknown node") + return nil, fmt.Errorf("file not found") + } + + data, err := node.GetFile(prefix, path) + if err != nil { + logger.Debug().Log("Invalid path") + return nil, fmt.Errorf("file not found") + } + + logger.Debug().Log("File cluster path") return data, nil } diff --git a/http/fs/cluster.go b/http/fs/cluster.go index 2d1b895b..364a8c92 100644 --- a/http/fs/cluster.go +++ b/http/fs/cluster.go @@ -41,7 +41,7 @@ func (fs *filesystem) Open(path string) fs.File { } // Check if the file is available in the cluster - data, err := fs.proxy.GetFile(fs.name + ":" + path) + data, err := fs.proxy.GetFile(fs.name, path) if err != nil { return nil } diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index 5b6f7952..3250f488 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -277,13 +277,16 @@ func (s *server) handlePlay(conn *rtmp.Conn) { if ch == nil { // Check in the cluster for that stream - url, err := s.proxy.GetURL("rtmp:" + conn.URL.Path) + url, err := s.proxy.GetURL("rtmp", conn.URL.Path) if err != nil { s.log(identity, "PLAY", "NOTFOUND", conn.URL.Path, "", remote) return } - src, err := avutil.Open(url) + url.JoinPath(token) + peerurl := url.String() + + src, err := avutil.Open(peerurl) if err != nil { s.logger.Error().WithField("address", url).WithError(err).Log("Proxying address failed") s.log(identity, "PLAY", "NOTFOUND", conn.URL.Path, "", remote) @@ -296,13 +299,13 @@ func (s *server) handlePlay(conn *rtmp.Conn) { wg.Add(1) go func() { - s.log(identity, "PLAY", "PROXYSTART", url, "", remote) + s.log(identity, "PLAY", "PROXYSTART", peerurl, "", remote) wg.Done() err := s.publish(c, playpath, remote, identity, true) if err != nil { s.logger.Error().WithField("address", url).WithError(err).Log("Proxying address failed") } - s.log(identity, "PLAY", "PROXYSTOP", url, "", remote) + s.log(identity, "PLAY", "PROXYSTOP", peerurl, "", remote) }() // Wait for the goroutine to start diff --git a/srt/srt.go b/srt/srt.go index 2476c37f..b304b630 100644 --- a/srt/srt.go +++ b/srt/srt.go @@ -401,23 +401,26 @@ func (s *server) handleSubscribe(conn srt.Conn) { if ch == nil { // Check in the cluster for the stream and proxy it - srturl, err := s.proxy.GetURL("srt:" + si.Resource) + srturl, err := s.proxy.GetURL("srt", si.Resource) if err != nil { s.log("SUBSCRIBE", "NOTFOUND", si.Resource, "no publisher for this resource found", client) return } + peerurl := srturl.String() + config := srt.DefaultConfig() + config.StreamId = streamId config.Latency = 200 * time.Millisecond // This might be a value obtained from the cluster - host, err := config.UnmarshalURL(srturl) + host, err := config.UnmarshalURL(peerurl) if err != nil { - s.logger.Error().WithField("address", srturl).WithError(err).Log("Parsing proxy address failed") + s.logger.Error().WithField("address", peerurl).WithError(err).Log("Parsing proxy address failed") s.log("SUBSCRIBE", "NOTFOUND", si.Resource, "no publisher for this resource found", client) return } src, err := srt.Dial("srt", host, config) if err != nil { - s.logger.Error().WithField("address", srturl).WithError(err).Log("Proxying address failed") + s.logger.Error().WithField("address", peerurl).WithError(err).Log("Proxying address failed") s.log("SUBSCRIBE", "NOTFOUND", si.Resource, "no publisher for this resource found", client) return } @@ -426,13 +429,13 @@ func (s *server) handleSubscribe(conn srt.Conn) { wg.Add(1) go func() { - s.log("SUBSCRIBE", "PROXYSTART", srturl, "", client) + s.log("SUBSCRIBE", "PROXYSTART", peerurl, "", client) wg.Done() err := s.publish(src, true) if err != nil { s.logger.Error().WithField("address", srturl).WithError(err).Log("Proxying address failed") } - s.log("SUBSCRIBE", "PROXYPUBLISHSTOP", srturl, "", client) + s.log("SUBSCRIBE", "PROXYPUBLISHSTOP", peerurl, "", client) }() // Wait for the goroutine to start