Get files from diskfs and memfs via API

This commit is contained in:
Ingo Oppermann
2022-08-16 14:32:58 +03:00
parent dfc12ab9fb
commit 50164ea78e
7 changed files with 108 additions and 35 deletions

View File

@@ -39,12 +39,14 @@ type RestClient interface {
ConfigReload() error // GET /config/reload
DiskFSList(sort, order string) ([]api.FileInfo, error) // GET /fs/disk
DiskFSHasFile(path string) bool // GET /fs/disk/{path}
DiskFSHasFile(path string) bool // HEAD /fs/disk/{path}
DiskFSGetFile(path string) (io.ReadCloser, error) // GET /fs/disk/{path}
DiskFSDeleteFile(path string) error // DELETE /fs/disk/{path}
DiskFSAddFile(path string, data io.Reader) error // PUT /fs/disk/{path}
MemFSList(sort, order string) ([]api.FileInfo, error) // GET /fs/mem
MemFSHasFile(path string) bool // GET /fs/mem/{path}
MemFSHasFile(path string) bool // HEAD /fs/mem/{path}
MemFSGetFile(path string) (io.ReadCloser, error) // GET /fs/mem/{path}
MemFSDeleteFile(path string) error // DELETE /fs/mem/{path}
MemFSAddFile(path string, data io.Reader) error // PUT /fs/mem/{path}
@@ -235,9 +237,11 @@ func (r *restclient) login() error {
return fmt.Errorf("wrong username and/or password")
}
data, _ := io.ReadAll(body)
jwt := api.JWT{}
json.Unmarshal(body, &jwt)
json.Unmarshal(data, &jwt)
r.accessToken = jwt.AccessToken
r.refreshToken = jwt.RefreshToken
@@ -277,9 +281,11 @@ func (r *restclient) refresh() error {
return fmt.Errorf("invalid refresh token")
}
data, _ := io.ReadAll(body)
jwt := api.JWTRefresh{}
json.Unmarshal(body, &jwt)
json.Unmarshal(data, &jwt)
r.accessToken = jwt.AccessToken
@@ -305,14 +311,16 @@ func (r *restclient) info() (api.About, error) {
return api.About{}, fmt.Errorf("access to API failed (%d)", status)
}
data, _ := io.ReadAll(body)
about := api.About{}
json.Unmarshal(body, &about)
json.Unmarshal(data, &about)
return about, nil
}
func (r *restclient) call(method, path, contentType string, data io.Reader) ([]byte, error) {
func (r *restclient) stream(method, path, contentType string, data io.Reader) (io.ReadCloser, error) {
req, err := http.NewRequest(method, r.address+r.prefix+"/v3"+path, data)
if err != nil {
return nil, err
@@ -345,7 +353,11 @@ func (r *restclient) call(method, path, contentType string, data io.Reader) ([]b
if status < 200 || status >= 300 {
e := api.Error{}
json.Unmarshal(body, &e)
defer body.Close()
x, _ := io.ReadAll(body)
json.Unmarshal(x, &e)
return nil, fmt.Errorf("%w", e)
}
@@ -353,15 +365,24 @@ func (r *restclient) call(method, path, contentType string, data io.Reader) ([]b
return body, nil
}
func (r *restclient) request(req *http.Request) (int, []byte, error) {
func (r *restclient) call(method, path, contentType string, data io.Reader) ([]byte, error) {
body, err := r.stream(method, path, contentType, data)
if err != nil {
return nil, err
}
defer body.Close()
x, _ := io.ReadAll(body)
return x, nil
}
func (r *restclient) request(req *http.Request) (int, io.ReadCloser, error) {
resp, err := r.client.Do(req)
if err != nil {
return -1, nil, err
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
return resp.StatusCode, body, nil
return resp.StatusCode, resp.Body, nil
}

View File

@@ -37,11 +37,15 @@ func (r *restclient) DiskFSList(sort, order string) ([]api.FileInfo, error) {
}
func (r *restclient) DiskFSHasFile(path string) bool {
_, err := r.call("GET", "/fs/disk"+path, "", nil)
_, err := r.call("HEAD", "/fs/disk"+path, "", nil)
return err == nil
}
func (r *restclient) DiskFSGetFile(path string) (io.ReadCloser, error) {
return r.stream("GET", "/fs/disk"+path, "", nil)
}
func (r *restclient) DiskFSDeleteFile(path string) error {
_, err := r.call("DELETE", "/fs/disk"+path, "", nil)

View File

@@ -26,11 +26,15 @@ func (r *restclient) MemFSList(sort, order string) ([]api.FileInfo, error) {
}
func (r *restclient) MemFSHasFile(path string) bool {
_, err := r.call("GET", "/fs/mem"+path, "", nil)
_, err := r.call("HEAD", "/fs/mem"+path, "", nil)
return err == nil
}
func (r *restclient) MemFSGetFile(path string) (io.ReadCloser, error) {
return r.stream("GET", "/fs/mem"+path, "", nil)
}
func (r *restclient) MemFSDeleteFile(path string) error {
_, err := r.call("DELETE", "/fs/mem"+path, "", nil)

View File

@@ -3,6 +3,7 @@ package cluster
import (
"context"
"fmt"
"io"
"sync"
"time"
@@ -12,6 +13,7 @@ import (
type ClusterReader interface {
GetURL(path string) (string, error)
GetFile(path string) (io.ReadCloser, error)
}
type dummyClusterReader struct{}
@@ -24,6 +26,10 @@ func (r *dummyClusterReader) GetURL(path string) (string, error) {
return "", fmt.Errorf("not implemented in dummy cluster")
}
func (r *dummyClusterReader) GetFile(path string) (io.ReadCloser, error) {
return nil, fmt.Errorf("not implemented in dummy cluster")
}
type Cluster interface {
AddNode(address, username, password string) (string, error)
RemoveNode(id string) error
@@ -240,7 +246,7 @@ func (c *cluster) GetURL(path string) (string, error) {
return "", fmt.Errorf("file not found")
}
url, err := node.GetURL(path)
url, err := node.getURL(path)
if err != nil {
c.logger.Debug().WithField("path", path).Log("Invalid path")
return "", fmt.Errorf("file not found")
@@ -250,3 +256,41 @@ func (c *cluster) GetURL(path string) (string, error) {
return url, nil
}
func (c *cluster) GetFile(path string) (io.ReadCloser, error) {
c.lock.RLock()
defer c.lock.RUnlock()
id, ok := c.fileid[path]
if !ok {
c.logger.Debug().WithField("path", path).Log("Not found")
return nil, fmt.Errorf("file not found")
}
ts, ok := c.idupdate[id]
if !ok {
c.logger.Debug().WithField("path", path).Log("No age information found")
return nil, fmt.Errorf("file not found")
}
if time.Since(ts) > 2*time.Second {
c.logger.Debug().WithField("path", path).Log("File too old")
return nil, fmt.Errorf("file not found")
}
node, ok := c.nodes[id]
if !ok {
c.logger.Debug().WithField("path", path).Log("Unknown node")
return nil, fmt.Errorf("file not found")
}
data, err := node.getFile(path)
if err != nil {
c.logger.Debug().WithField("path", path).Log("Invalid path")
return nil, fmt.Errorf("file not found")
}
c.logger.Debug().WithField("path", path).Log("File cluster path")
return data, nil
}

View File

@@ -3,6 +3,7 @@ package cluster
import (
"context"
"fmt"
"io"
"net"
"net/http"
"net/url"
@@ -253,7 +254,7 @@ func (n *node) files() {
}
}
func (n *node) GetURL(path string) (string, error) {
func (n *node) getURL(path string) (string, error) {
// Remove prefix from path
prefix := n.prefix.FindString(path)
path = n.prefix.ReplaceAllString(path, "")
@@ -285,3 +286,17 @@ func (n *node) GetURL(path string) (string, error) {
return u, nil
}
func (n *node) getFile(path string) (io.ReadCloser, error) {
// Remove prefix from path
prefix := n.prefix.FindString(path)
path = n.prefix.ReplaceAllString(path, "")
if prefix == "memfs:" {
return n.peer.MemFSGetFile(path)
} else if prefix == "diskfs:" {
return n.peer.DiskFSGetFile(path)
}
return nil, fmt.Errorf("unknown prefix")
}

View File

@@ -2,7 +2,6 @@ package fs
import (
"io"
"net/http"
"time"
"github.com/datarhei/core/v16/cluster"
@@ -37,27 +36,13 @@ func (fs *filesystem) Open(path string) fs.File {
}
// Check if the file is available in the cluster
url, err := fs.cluster.GetURL(fs.what + ":" + path)
if err != nil {
return nil
}
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil
}
client := &http.Client{
Timeout: 15 * time.Second,
}
resp, err := client.Do(req)
data, err := fs.cluster.GetFile(fs.what + ":" + path)
if err != nil {
return nil
}
file := &file{
ReadCloser: resp.Body,
ReadCloser: data,
name: path,
}

View File

@@ -186,7 +186,7 @@ func (h *hls) handleEgress(c echo.Context, next echo.HandlerFunc) error {
// Add the new session's top bitrate to the ingress top bitrate
resultingBitrate := currentBitrate + streamBitrate
if resultingBitrate <= 0.5 || resultingBitrate >= maxBitrate {
if resultingBitrate >= maxBitrate {
return echo.NewHTTPError(509, "Bitrate limit exceeded")
}
}