Fix lasContact increasing on error

This commit is contained in:
Ingo Oppermann
2023-06-06 21:54:57 +02:00
parent fc03bf73a2
commit ef6d90d8ce

View File

@@ -2,6 +2,7 @@ package proxy
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"io" "io"
"net" "net"
@@ -285,7 +286,7 @@ func (n *node) Connect() error {
n.resources.throttling = true n.resources.throttling = true
n.resources.cpu = 100 n.resources.cpu = 100
n.resources.ncpu = 1 n.resources.ncpu = 1
n.resources.cpuLimit = 0 n.resources.cpuLimit = 100
n.resources.mem = 0 n.resources.mem = 0
n.resources.memLimit = 0 n.resources.memLimit = 0
n.state = stateDisconnected n.state = stateDisconnected
@@ -486,6 +487,11 @@ func (n *node) About() NodeAbout {
}, },
} }
if state == stateDisconnected {
nodeAbout.Uptime = 0
nodeAbout.Latency = 0
}
return nodeAbout return nodeAbout
} }
@@ -535,124 +541,147 @@ func (n *node) Files() NodeFiles {
return state return state
} }
var errNoPeer = errors.New("no peer")
func (n *node) files() { func (n *node) files() {
errorsChan := make(chan error, 8)
filesChan := make(chan string, 1024) filesChan := make(chan string, 1024)
filesList := []string{} filesList := []string{}
errorList := []error{}
ctx, cancel := context.WithCancel(context.Background())
wgList := sync.WaitGroup{} wgList := sync.WaitGroup{}
wgList.Add(1) wgList.Add(1)
go func() { go func(ctx context.Context) {
defer wgList.Done() defer wgList.Done()
for file := range filesChan { for {
filesList = append(filesList, file) select {
case <-ctx.Done():
return
case file := <-filesChan:
filesList = append(filesList, file)
case err := <-errorsChan:
errorList = append(errorList, err)
}
} }
}() }(ctx)
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(2) wg.Add(2)
go func(f chan<- string) { go func(f chan<- string, e chan<- error) {
defer wg.Done() defer wg.Done()
n.peerLock.RLock() n.peerLock.RLock()
defer n.peerLock.RUnlock() defer n.peerLock.RUnlock()
if n.peer == nil { if n.peer == nil {
e <- errNoPeer
return return
} }
files, err := n.peer.MemFSList("name", "asc") files, err := n.peer.MemFSList("name", "asc")
if err != nil { if err != nil {
e <- err
return return
} }
for _, file := range files { for _, file := range files {
f <- "mem:" + file.Name f <- "mem:" + file.Name
} }
}(filesChan) }(filesChan, errorsChan)
go func(f chan<- string) { go func(f chan<- string, e chan<- error) {
defer wg.Done() defer wg.Done()
n.peerLock.RLock() n.peerLock.RLock()
defer n.peerLock.RUnlock() defer n.peerLock.RUnlock()
if n.peer == nil { if n.peer == nil {
e <- errNoPeer
return return
} }
files, err := n.peer.DiskFSList("name", "asc") files, err := n.peer.DiskFSList("name", "asc")
if err != nil { if err != nil {
e <- err
return return
} }
for _, file := range files { for _, file := range files {
f <- "disk:" + file.Name f <- "disk:" + file.Name
} }
}(filesChan) }(filesChan, errorsChan)
if n.hasRTMP { if n.hasRTMP {
wg.Add(1) wg.Add(1)
go func(f chan<- string) { go func(f chan<- string, e chan<- error) {
defer wg.Done() defer wg.Done()
n.peerLock.RLock() n.peerLock.RLock()
defer n.peerLock.RUnlock() defer n.peerLock.RUnlock()
if n.peer == nil { if n.peer == nil {
e <- errNoPeer
return return
} }
files, err := n.peer.RTMPChannels() files, err := n.peer.RTMPChannels()
if err != nil { if err != nil {
e <- err
return return
} }
for _, file := range files { for _, file := range files {
f <- "rtmp:" + file.Name f <- "rtmp:" + file.Name
} }
}(filesChan) }(filesChan, errorsChan)
} }
if n.hasSRT { if n.hasSRT {
wg.Add(1) wg.Add(1)
go func(f chan<- string) { go func(f chan<- string, e chan<- error) {
defer wg.Done() defer wg.Done()
n.peerLock.RLock() n.peerLock.RLock()
defer n.peerLock.RUnlock() defer n.peerLock.RUnlock()
if n.peer == nil { if n.peer == nil {
e <- errNoPeer
return return
} }
files, err := n.peer.SRTChannels() files, err := n.peer.SRTChannels()
if err != nil { if err != nil {
e <- err
return return
} }
for _, file := range files { for _, file := range files {
f <- "srt:" + file.Name f <- "srt:" + file.Name
} }
}(filesChan) }(filesChan, errorsChan)
} }
wg.Wait() wg.Wait()
close(filesChan) cancel()
wgList.Wait() wgList.Wait()
n.stateLock.Lock() n.stateLock.Lock()
n.filesList = make([]string, len(filesList)) if len(errorList) == 0 {
copy(n.filesList, filesList) n.filesList = make([]string, len(filesList))
n.lastUpdate = time.Now() copy(n.filesList, filesList)
n.lastContact = time.Now() n.lastUpdate = time.Now()
n.lastContact = n.lastUpdate
}
n.stateLock.Unlock() n.stateLock.Unlock()
} }