diff --git a/cluster/cluster.go b/cluster/cluster.go index 71b4d497..8acc7bce 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -45,10 +45,10 @@ type ClusterConfig struct { } type cluster struct { - nodes map[string]*node - idfiles map[string][]string - idupdate map[string]time.Time - fileid map[string]string + nodes map[string]*node // List of known nodes + idfiles map[string][]string // Map from nodeid to list of files + idupdate map[string]time.Time // Map from nodeid to time of last update + fileid map[string]string // Map from file name to nodeid limiter net.IPLimiter diff --git a/cluster/node.go b/cluster/node.go index 3f93975d..98b3e451 100644 --- a/cluster/node.go +++ b/cluster/node.go @@ -48,7 +48,7 @@ type node struct { password string updates chan<- NodeState peer client.RestClient - fileList []string + filesList []string lastUpdate time.Time lock sync.RWMutex cancel context.CancelFunc @@ -162,9 +162,7 @@ func newNode(address, username, password string, updates chan<- NodeState) (*nod case <-ctx.Done(): return case <-ticker.C: - n.lock.Lock() n.files() - n.lock.Unlock() select { case n.updates <- n.State(): @@ -202,8 +200,8 @@ func (n *node) State() NodeState { state.State = stateDisconnected.String() } else { state.State = n.state.String() - state.Files = make([]string, len(n.fileList)) - copy(state.Files, n.fileList) + state.Files = make([]string, len(n.filesList)) + copy(state.Files, n.filesList) } return state @@ -214,44 +212,101 @@ func (n *node) stop() { } func (n *node) files() { - memfsfiles, errMemfs := n.peer.MemFSList("name", "asc") - diskfsfiles, errDiskfs := n.peer.DiskFSList("name", "asc") - rtmpfiles, errRTMP := n.peer.RTMPChannels() - srtfiles, errSRT := n.peer.SRTChannels() + filesChan := make(chan string, 1024) + filesList := []string{} - n.lastUpdate = time.Now() + wgList := sync.WaitGroup{} + wgList.Add(1) - if errMemfs != nil || errDiskfs != nil || errRTMP != nil || errSRT != nil { - n.fileList = nil - n.state = stateDisconnected - return + go func() { + defer wgList.Done() + + for file := range filesChan { + if len(file) == 0 { + return + } + + filesList = append(filesList, file) + } + }() + + wg := sync.WaitGroup{} + wg.Add(2) + + go func(f chan<- string) { + defer wg.Done() + + files, err := n.peer.MemFSList("name", "asc") + if err != nil { + return + } + + for _, file := range files { + filesChan <- "memfs:" + file.Name + } + }(filesChan) + + go func(f chan<- string) { + defer wg.Done() + + files, err := n.peer.DiskFSList("name", "asc") + if err != nil { + return + } + + for _, file := range files { + filesChan <- "diskfs:" + file.Name + } + }(filesChan) + + if n.hasRTMP { + wg.Add(1) + + go func(f chan<- string) { + defer wg.Done() + + files, err := n.peer.RTMPChannels() + if err != nil { + return + } + + for _, file := range files { + filesChan <- "rtmp:" + file.Name + } + }(filesChan) } + if n.hasSRT { + wg.Add(1) + + go func(f chan<- string) { + defer wg.Done() + + files, err := n.peer.SRTChannels() + if err != nil { + return + } + + for _, file := range files { + filesChan <- "srt:" + file.Name + } + }(filesChan) + } + + wg.Wait() + + filesChan <- "" + + wgList.Wait() + + n.lock.Lock() + + n.filesList = make([]string, len(filesList)) + copy(n.filesList, filesList) + n.lastUpdate = time.Now() n.state = stateConnected - n.fileList = make([]string, len(memfsfiles)+len(diskfsfiles)+len(rtmpfiles)+len(srtfiles)) - - nfiles := 0 - - for _, file := range memfsfiles { - n.fileList[nfiles] = "memfs:" + file.Name - nfiles++ - } - - for _, file := range diskfsfiles { - n.fileList[nfiles] = "diskfs:" + file.Name - nfiles++ - } - - for _, file := range rtmpfiles { - n.fileList[nfiles] = "rtmp:" + file.Name - nfiles++ - } - - for _, file := range srtfiles { - n.fileList[nfiles] = "srt:" + file.Name - nfiles++ - } + n.lock.Unlock() } func (n *node) getURL(path string) (string, error) { diff --git a/http/handler/api/cluster.go b/http/handler/api/cluster.go index e595f06a..0a1354d8 100644 --- a/http/handler/api/cluster.go +++ b/http/handler/api/cluster.go @@ -2,6 +2,7 @@ package api import ( "net/http" + "sort" "github.com/datarhei/core/v16/cluster" "github.com/datarhei/core/v16/http/api" @@ -147,6 +148,8 @@ func (h *ClusterHandler) GetNodeProxy(c echo.Context) error { state := peer.State() + sort.Strings(state.Files) + return c.JSON(http.StatusOK, state.Files) }