mirror of
https://github.com/datarhei/core.git
synced 2025-10-05 16:07:07 +08:00
Fetch resources list in parallel
This commit is contained in:
@@ -45,10 +45,10 @@ type ClusterConfig struct {
|
||||
}
|
||||
|
||||
type cluster struct {
|
||||
nodes map[string]*node
|
||||
idfiles map[string][]string
|
||||
idupdate map[string]time.Time
|
||||
fileid map[string]string
|
||||
nodes map[string]*node // List of known nodes
|
||||
idfiles map[string][]string // Map from nodeid to list of files
|
||||
idupdate map[string]time.Time // Map from nodeid to time of last update
|
||||
fileid map[string]string // Map from file name to nodeid
|
||||
|
||||
limiter net.IPLimiter
|
||||
|
||||
|
127
cluster/node.go
127
cluster/node.go
@@ -48,7 +48,7 @@ type node struct {
|
||||
password string
|
||||
updates chan<- NodeState
|
||||
peer client.RestClient
|
||||
fileList []string
|
||||
filesList []string
|
||||
lastUpdate time.Time
|
||||
lock sync.RWMutex
|
||||
cancel context.CancelFunc
|
||||
@@ -162,9 +162,7 @@ func newNode(address, username, password string, updates chan<- NodeState) (*nod
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
n.lock.Lock()
|
||||
n.files()
|
||||
n.lock.Unlock()
|
||||
|
||||
select {
|
||||
case n.updates <- n.State():
|
||||
@@ -202,8 +200,8 @@ func (n *node) State() NodeState {
|
||||
state.State = stateDisconnected.String()
|
||||
} else {
|
||||
state.State = n.state.String()
|
||||
state.Files = make([]string, len(n.fileList))
|
||||
copy(state.Files, n.fileList)
|
||||
state.Files = make([]string, len(n.filesList))
|
||||
copy(state.Files, n.filesList)
|
||||
}
|
||||
|
||||
return state
|
||||
@@ -214,44 +212,101 @@ func (n *node) stop() {
|
||||
}
|
||||
|
||||
func (n *node) files() {
|
||||
memfsfiles, errMemfs := n.peer.MemFSList("name", "asc")
|
||||
diskfsfiles, errDiskfs := n.peer.DiskFSList("name", "asc")
|
||||
rtmpfiles, errRTMP := n.peer.RTMPChannels()
|
||||
srtfiles, errSRT := n.peer.SRTChannels()
|
||||
filesChan := make(chan string, 1024)
|
||||
filesList := []string{}
|
||||
|
||||
n.lastUpdate = time.Now()
|
||||
wgList := sync.WaitGroup{}
|
||||
wgList.Add(1)
|
||||
|
||||
if errMemfs != nil || errDiskfs != nil || errRTMP != nil || errSRT != nil {
|
||||
n.fileList = nil
|
||||
n.state = stateDisconnected
|
||||
go func() {
|
||||
defer wgList.Done()
|
||||
|
||||
for file := range filesChan {
|
||||
if len(file) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
filesList = append(filesList, file)
|
||||
}
|
||||
}()
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(2)
|
||||
|
||||
go func(f chan<- string) {
|
||||
defer wg.Done()
|
||||
|
||||
files, err := n.peer.MemFSList("name", "asc")
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for _, file := range files {
|
||||
filesChan <- "memfs:" + file.Name
|
||||
}
|
||||
}(filesChan)
|
||||
|
||||
go func(f chan<- string) {
|
||||
defer wg.Done()
|
||||
|
||||
files, err := n.peer.DiskFSList("name", "asc")
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for _, file := range files {
|
||||
filesChan <- "diskfs:" + file.Name
|
||||
}
|
||||
}(filesChan)
|
||||
|
||||
if n.hasRTMP {
|
||||
wg.Add(1)
|
||||
|
||||
go func(f chan<- string) {
|
||||
defer wg.Done()
|
||||
|
||||
files, err := n.peer.RTMPChannels()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for _, file := range files {
|
||||
filesChan <- "rtmp:" + file.Name
|
||||
}
|
||||
}(filesChan)
|
||||
}
|
||||
|
||||
if n.hasSRT {
|
||||
wg.Add(1)
|
||||
|
||||
go func(f chan<- string) {
|
||||
defer wg.Done()
|
||||
|
||||
files, err := n.peer.SRTChannels()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for _, file := range files {
|
||||
filesChan <- "srt:" + file.Name
|
||||
}
|
||||
}(filesChan)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
filesChan <- ""
|
||||
|
||||
wgList.Wait()
|
||||
|
||||
n.lock.Lock()
|
||||
|
||||
n.filesList = make([]string, len(filesList))
|
||||
copy(n.filesList, filesList)
|
||||
n.lastUpdate = time.Now()
|
||||
n.state = stateConnected
|
||||
|
||||
n.fileList = make([]string, len(memfsfiles)+len(diskfsfiles)+len(rtmpfiles)+len(srtfiles))
|
||||
|
||||
nfiles := 0
|
||||
|
||||
for _, file := range memfsfiles {
|
||||
n.fileList[nfiles] = "memfs:" + file.Name
|
||||
nfiles++
|
||||
}
|
||||
|
||||
for _, file := range diskfsfiles {
|
||||
n.fileList[nfiles] = "diskfs:" + file.Name
|
||||
nfiles++
|
||||
}
|
||||
|
||||
for _, file := range rtmpfiles {
|
||||
n.fileList[nfiles] = "rtmp:" + file.Name
|
||||
nfiles++
|
||||
}
|
||||
|
||||
for _, file := range srtfiles {
|
||||
n.fileList[nfiles] = "srt:" + file.Name
|
||||
nfiles++
|
||||
}
|
||||
n.lock.Unlock()
|
||||
}
|
||||
|
||||
func (n *node) getURL(path string) (string, error) {
|
||||
|
@@ -2,6 +2,7 @@ package api
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"sort"
|
||||
|
||||
"github.com/datarhei/core/v16/cluster"
|
||||
"github.com/datarhei/core/v16/http/api"
|
||||
@@ -147,6 +148,8 @@ func (h *ClusterHandler) GetNodeProxy(c echo.Context) error {
|
||||
|
||||
state := peer.State()
|
||||
|
||||
sort.Strings(state.Files)
|
||||
|
||||
return c.JSON(http.StatusOK, state.Files)
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user