Fix clusterNode/proxyNode

This commit is contained in:
Ingo Oppermann
2023-06-14 22:05:59 +02:00
parent cfc5650e1a
commit a728cc7839
14 changed files with 494 additions and 288 deletions

View File

@@ -873,11 +873,17 @@ func (a *api) start() error {
if len(config.Owner) == 0 { if len(config.Owner) == 0 {
identity = a.iam.GetDefaultVerifier() identity = a.iam.GetDefaultVerifier()
} else { } else {
identity, _ = a.iam.GetVerifier(config.Owner) var err error = nil
identity, err = a.iam.GetVerifier(config.Owner)
if err != nil {
identity = nil
}
} }
if identity != nil { if identity != nil {
u.User = url.UserPassword(config.Owner, identity.GetServiceBasicAuth()) u.User = url.UserPassword(config.Owner, identity.GetServiceBasicAuth())
} else {
u.User = url.User(config.Owner)
} }
return u.String() return u.String()
@@ -891,11 +897,17 @@ func (a *api) start() error {
if len(config.Owner) == 0 { if len(config.Owner) == 0 {
identity = a.iam.GetDefaultVerifier() identity = a.iam.GetDefaultVerifier()
} else { } else {
identity, _ = a.iam.GetVerifier(config.Owner) var err error = nil
identity, err = a.iam.GetVerifier(config.Owner)
if err != nil {
identity = nil
}
} }
if identity != nil { if identity != nil {
u.User = url.UserPassword(config.Owner, identity.GetServiceBasicAuth()) u.User = url.UserPassword(config.Owner, identity.GetServiceBasicAuth())
} else {
u.User = url.User(config.Owner)
} }
return u.String() return u.String()

View File

@@ -398,11 +398,11 @@ func (c *cluster) IsRaftLeader() bool {
return c.isRaftLeader return c.isRaftLeader
} }
func (c *cluster) IsDegraded() bool { func (c *cluster) IsDegraded() (bool, error) {
c.stateLock.Lock() c.stateLock.Lock()
defer c.stateLock.Unlock() defer c.stateLock.Unlock()
return c.isDegraded return c.isDegraded, c.isDegradedErr
} }
func (c *cluster) Leave(origin, id string) error { func (c *cluster) Leave(origin, id string) error {
@@ -608,7 +608,7 @@ func (c *cluster) Snapshot() (io.ReadCloser, error) {
} }
func (c *cluster) trackNodeChanges() { func (c *cluster) trackNodeChanges() {
ticker := time.NewTicker(time.Second) ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop() defer ticker.Stop()
for { for {
@@ -629,7 +629,7 @@ func (c *cluster) trackNodeChanges() {
} }
for _, server := range servers { for _, server := range servers {
id := server.ID + server.Address id := server.ID
_, ok := c.nodes[id] _, ok := c.nodes[id]
if !ok { if !ok {
@@ -643,18 +643,15 @@ func (c *cluster) trackNodeChanges() {
logger.Warn().WithError(err).Log("Discovering cluster API address") logger.Warn().WithError(err).Log("Discovering cluster API address")
} }
cnode := NewClusterNode(address) cnode := NewClusterNode(id, address)
if !verifyClusterVersion(cnode.Version()) { if err := verifyClusterVersion(cnode.Version()); err != nil {
logger.Warn().Log("Version mismatch. Cluster will end up in degraded mode") logger.Warn().Log("Version mismatch. Cluster will end up in degraded mode")
} }
if err := verifyClusterConfig(c.config, cnode.Config()); err != nil {
logger.Warn().WithError(err).Log("Config mismatch. Cluster will end up in degraded mode")
}
if _, err := c.proxy.AddNode(id, cnode.Proxy()); err != nil { if _, err := c.proxy.AddNode(id, cnode.Proxy()); err != nil {
c.logger.Warn().WithError(err).Log("Adding node") logger.Warn().WithError(err).Log("Adding node")
cnode.Stop()
continue continue
} }
@@ -700,35 +697,38 @@ func (c *cluster) checkClusterNodes() error {
defer c.nodesLock.RUnlock() defer c.nodesLock.RUnlock()
for id, node := range c.nodes { for id, node := range c.nodes {
if status, statusErr := node.Status(); status == "offline" { if status, err := node.Status(); status == "offline" {
return fmt.Errorf("node %s is offline: %w", id, statusErr) return fmt.Errorf("node %s is offline: %w", id, err)
} }
version := node.Version() version := node.Version()
if !verifyClusterVersion(version) { if err := verifyClusterVersion(version); err != nil {
return fmt.Errorf("node %s has a different version: %s", id, version) return fmt.Errorf("node %s has a different version: %s: %w", id, version, err)
} }
config := node.Config() config, err := node.Config()
if configErr := verifyClusterConfig(c.config, config); configErr != nil { if err != nil {
return fmt.Errorf("node %s has a different configuration: %w", id, configErr) return fmt.Errorf("node %s has no configuration available: %w", id, err)
}
if err := verifyClusterConfig(c.config, config); err != nil {
return fmt.Errorf("node %s has a different configuration: %w", id, err)
} }
} }
return nil return nil
} }
func verifyClusterVersion(v string) bool { func verifyClusterVersion(v string) error {
version, err := ParseClusterVersion(v) version, err := ParseClusterVersion(v)
if err != nil { if err != nil {
return false return fmt.Errorf("parsing version %s: %w", v, err)
} }
if !Version.Equal(version) { if !Version.Equal(version) {
return false return fmt.Errorf("version %s not equal to expected version %s", version.String(), Version.String())
} }
return true return nil
} }
func verifyClusterConfig(local, remote *config.Config) error { func verifyClusterConfig(local, remote *config.Config) error {
@@ -806,7 +806,7 @@ func (c *cluster) GetProcess(id app.ProcessID) (store.Process, error) {
} }
func (c *cluster) AddProcess(origin string, config *app.Config) error { func (c *cluster) AddProcess(origin string, config *app.Config) error {
if c.IsDegraded() { if ok, _ := c.IsDegraded(); ok {
return ErrDegraded return ErrDegraded
} }
@@ -825,7 +825,7 @@ func (c *cluster) AddProcess(origin string, config *app.Config) error {
} }
func (c *cluster) RemoveProcess(origin string, id app.ProcessID) error { func (c *cluster) RemoveProcess(origin string, id app.ProcessID) error {
if c.IsDegraded() { if ok, _ := c.IsDegraded(); ok {
return ErrDegraded return ErrDegraded
} }
@@ -844,7 +844,7 @@ func (c *cluster) RemoveProcess(origin string, id app.ProcessID) error {
} }
func (c *cluster) UpdateProcess(origin string, id app.ProcessID, config *app.Config) error { func (c *cluster) UpdateProcess(origin string, id app.ProcessID, config *app.Config) error {
if c.IsDegraded() { if ok, _ := c.IsDegraded(); ok {
return ErrDegraded return ErrDegraded
} }
@@ -864,7 +864,7 @@ func (c *cluster) UpdateProcess(origin string, id app.ProcessID, config *app.Con
} }
func (c *cluster) SetProcessMetadata(origin string, id app.ProcessID, key string, data interface{}) error { func (c *cluster) SetProcessMetadata(origin string, id app.ProcessID, key string, data interface{}) error {
if c.IsDegraded() { if ok, _ := c.IsDegraded(); ok {
return ErrDegraded return ErrDegraded
} }
@@ -939,7 +939,7 @@ func (c *cluster) ListUserPolicies(name string) (time.Time, []iamaccess.Policy)
} }
func (c *cluster) AddIdentity(origin string, identity iamidentity.User) error { func (c *cluster) AddIdentity(origin string, identity iamidentity.User) error {
if c.IsDegraded() { if ok, _ := c.IsDegraded(); ok {
return ErrDegraded return ErrDegraded
} }
@@ -958,7 +958,7 @@ func (c *cluster) AddIdentity(origin string, identity iamidentity.User) error {
} }
func (c *cluster) UpdateIdentity(origin, name string, identity iamidentity.User) error { func (c *cluster) UpdateIdentity(origin, name string, identity iamidentity.User) error {
if c.IsDegraded() { if ok, _ := c.IsDegraded(); ok {
return ErrDegraded return ErrDegraded
} }
@@ -978,7 +978,7 @@ func (c *cluster) UpdateIdentity(origin, name string, identity iamidentity.User)
} }
func (c *cluster) SetPolicies(origin, name string, policies []iamaccess.Policy) error { func (c *cluster) SetPolicies(origin, name string, policies []iamaccess.Policy) error {
if c.IsDegraded() { if ok, _ := c.IsDegraded(); ok {
return ErrDegraded return ErrDegraded
} }
@@ -998,7 +998,7 @@ func (c *cluster) SetPolicies(origin, name string, policies []iamaccess.Policy)
} }
func (c *cluster) RemoveIdentity(origin string, name string) error { func (c *cluster) RemoveIdentity(origin string, name string) error {
if c.IsDegraded() { if ok, _ := c.IsDegraded(); ok {
return ErrDegraded return ErrDegraded
} }
@@ -1057,15 +1057,17 @@ type ClusterAbout struct {
Nodes []proxy.NodeAbout Nodes []proxy.NodeAbout
Version ClusterVersion Version ClusterVersion
Degraded bool Degraded bool
DegradedErr error
} }
func (c *cluster) About() (ClusterAbout, error) { func (c *cluster) About() (ClusterAbout, error) {
degraded := c.IsDegraded() degraded, degradedErr := c.IsDegraded()
about := ClusterAbout{ about := ClusterAbout{
ID: c.id, ID: c.id,
Address: c.Address(), Address: c.Address(),
Degraded: degraded, Degraded: degraded,
DegradedErr: degradedErr,
} }
if address, err := c.ClusterAPIAddress(""); err == nil { if address, err := c.ClusterAPIAddress(""); err == nil {

View File

@@ -334,7 +334,7 @@ func (c *cluster) startSynchronizeAndRebalance(ctx context.Context, interval tim
return return
case <-ticker.C: case <-ticker.C:
if !emergency { if !emergency {
if c.IsDegraded() { if ok, _ := c.IsDegraded(); ok {
break break
} }

View File

@@ -10,11 +10,14 @@ import (
"github.com/datarhei/core/v16/cluster/client" "github.com/datarhei/core/v16/cluster/client"
"github.com/datarhei/core/v16/cluster/proxy" "github.com/datarhei/core/v16/cluster/proxy"
"github.com/datarhei/core/v16/config" "github.com/datarhei/core/v16/config"
"github.com/datarhei/core/v16/config/copy"
) )
type clusterNode struct { type clusterNode struct {
client client.APIClient client client.APIClient
id string
address string
version string version string
lastContact time.Time lastContact time.Time
lastContactErr error lastContactErr error
@@ -26,13 +29,13 @@ type clusterNode struct {
runLock sync.Mutex runLock sync.Mutex
cancelPing context.CancelFunc cancelPing context.CancelFunc
proxyLock sync.Mutex proxyNode proxy.Node
proxyNode proxy.Node
proxyNodeErr error
} }
func NewClusterNode(address string) *clusterNode { func NewClusterNode(id, address string) *clusterNode {
n := &clusterNode{ n := &clusterNode{
id: id,
address: address,
version: "0.0.0", version: "0.0.0",
client: client.APIClient{ client: client.APIClient{
Address: address, Address: address,
@@ -49,18 +52,12 @@ func NewClusterNode(address string) *clusterNode {
n.version = version n.version = version
p := proxy.NewNode(address) n.start(id)
err = p.Connect()
if err == nil {
n.proxyNode = p
} else {
n.proxyNodeErr = err
}
return n return n
} }
func (n *clusterNode) Start() error { func (n *clusterNode) start(id string) error {
n.runLock.Lock() n.runLock.Lock()
defer n.runLock.Unlock() defer n.runLock.Unlock()
@@ -74,33 +71,39 @@ func (n *clusterNode) Start() error {
go n.ping(ctx) go n.ping(ctx)
go n.pingCore(ctx) go n.pingCore(ctx)
address, _ := n.CoreAPIAddress() n.lastCoreContactErr = fmt.Errorf("not started yet")
n.lastContactErr = fmt.Errorf("not started yet")
address, err := n.CoreAPIAddress()
n.proxyNode = proxy.NewNode(id, address)
p := proxy.NewNode(address)
err := p.Connect()
if err != nil { if err != nil {
n.proxyNodeErr = err n.lastCoreContactErr = err
go func(ctx context.Context) {
go func(ctx context.Context, address string) { ticker := time.NewTicker(5 * time.Second)
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop() defer ticker.Stop()
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
p := proxy.NewNode(address) address, err := n.CoreAPIAddress()
err := p.Connect() n.pingLock.Lock()
if err == nil {
n.proxyNode.SetAddress(address)
n.lastCoreContactErr = nil
} else {
n.lastCoreContactErr = err
}
n.pingLock.Unlock()
if err == nil { if err == nil {
n.proxyNode = p
return return
} }
case <-ctx.Done(): case <-ctx.Done():
return return
} }
} }
}(ctx, address) }(ctx)
} else {
n.proxyNode = p
} }
return nil return nil
@@ -153,8 +156,84 @@ func (n *clusterNode) LastContact() time.Time {
return n.lastContact return n.lastContact
} }
func (n *clusterNode) Config() *config.Config { func (n *clusterNode) Config() (*config.Config, error) {
return nil if n.proxyNode == nil {
return nil, fmt.Errorf("proxy not available")
}
apiconfig := n.proxyNode.Config()
if apiconfig == nil {
return nil, fmt.Errorf("no config stored")
}
config := config.New(nil)
config.Version = apiconfig.Version
config.Version = apiconfig.Version
config.Version = apiconfig.Version
config.ID = apiconfig.ID
config.Name = apiconfig.Name
config.Address = apiconfig.Address
config.CheckForUpdates = apiconfig.CheckForUpdates
config.Log = apiconfig.Log
config.DB = apiconfig.DB
config.Host = apiconfig.Host
config.API.ReadOnly = apiconfig.API.ReadOnly
config.API.Access = apiconfig.API.Access
config.API.Auth.Enable = apiconfig.API.Auth.Enable
config.API.Auth.DisableLocalhost = apiconfig.API.Auth.DisableLocalhost
config.API.Auth.Username = apiconfig.API.Auth.Username
config.API.Auth.Password = apiconfig.API.Auth.Password
config.API.Auth.JWT = apiconfig.API.Auth.JWT
config.TLS = apiconfig.TLS
config.Storage.MimeTypes = apiconfig.Storage.MimeTypes
config.Storage.Disk = apiconfig.Storage.Disk
config.Storage.Memory = apiconfig.Storage.Memory
config.Storage.CORS = apiconfig.Storage.CORS
config.RTMP = apiconfig.RTMP
config.SRT = apiconfig.SRT
config.FFmpeg = apiconfig.FFmpeg
config.Playout = apiconfig.Playout
config.Debug = apiconfig.Debug
config.Metrics = apiconfig.Metrics
config.Sessions = apiconfig.Sessions
config.Service = apiconfig.Service
config.Router = apiconfig.Router
config.Resources = apiconfig.Resources
config.Cluster = apiconfig.Cluster
config.Log.Topics = copy.Slice(apiconfig.Log.Topics)
config.Host.Name = copy.Slice(apiconfig.Host.Name)
config.API.Access.HTTP.Allow = copy.Slice(apiconfig.API.Access.HTTP.Allow)
config.API.Access.HTTP.Block = copy.Slice(apiconfig.API.Access.HTTP.Block)
config.API.Access.HTTPS.Allow = copy.Slice(apiconfig.API.Access.HTTPS.Allow)
config.API.Access.HTTPS.Block = copy.Slice(apiconfig.API.Access.HTTPS.Block)
//config.API.Auth.Auth0.Tenants = copy.TenantSlice(d.API.Auth.Auth0.Tenants)
config.Storage.CORS.Origins = copy.Slice(apiconfig.Storage.CORS.Origins)
config.Storage.Disk.Cache.Types.Allow = copy.Slice(apiconfig.Storage.Disk.Cache.Types.Allow)
config.Storage.Disk.Cache.Types.Block = copy.Slice(apiconfig.Storage.Disk.Cache.Types.Block)
//config.Storage.S3 = copy.Slice(d.Storage.S3)
config.FFmpeg.Access.Input.Allow = copy.Slice(apiconfig.FFmpeg.Access.Input.Allow)
config.FFmpeg.Access.Input.Block = copy.Slice(apiconfig.FFmpeg.Access.Input.Block)
config.FFmpeg.Access.Output.Allow = copy.Slice(apiconfig.FFmpeg.Access.Output.Allow)
config.FFmpeg.Access.Output.Block = copy.Slice(apiconfig.FFmpeg.Access.Output.Block)
config.Sessions.IPIgnoreList = copy.Slice(apiconfig.Sessions.IPIgnoreList)
config.SRT.Log.Topics = copy.Slice(apiconfig.SRT.Log.Topics)
config.Router.BlockedPrefixes = copy.Slice(apiconfig.Router.BlockedPrefixes)
config.Router.Routes = copy.StringMap(apiconfig.Router.Routes)
config.Cluster.Peers = copy.Slice(apiconfig.Cluster.Peers)
return config, nil
} }
func (n *clusterNode) CoreAPIAddress() (string, error) { func (n *clusterNode) CoreAPIAddress() (string, error) {
@@ -198,22 +277,16 @@ func (n *clusterNode) pingCore(ctx context.Context) {
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
var err error _, err := n.proxyNode.IsConnected()
n.proxyLock.Lock()
if n.proxyNode == nil {
err = fmt.Errorf("can't connect to core api: %w", n.proxyNodeErr)
} else {
_, err = n.proxyNode.Ping()
}
n.proxyLock.Unlock()
if err == nil { if err == nil {
n.pingLock.Lock() n.pingLock.Lock()
n.lastCoreContact = time.Now() n.lastCoreContact = time.Now()
n.lastCoreContactErr = nil
n.pingLock.Unlock() n.pingLock.Unlock()
} else { } else {
n.pingLock.Lock() n.pingLock.Lock()
n.lastCoreContactErr = err n.lastCoreContactErr = fmt.Errorf("not connected to core api: %w", err)
n.pingLock.Unlock() n.pingLock.Unlock()
} }
case <-ctx.Done(): case <-ctx.Done():

View File

@@ -19,10 +19,11 @@ import (
) )
type Node interface { type Node interface {
Connect() error SetAddress(address string)
IsConnected() (bool, error)
Disconnect() Disconnect()
Config() clientapi.ConfigV3 Config() *clientapi.ConfigV3
StartFiles(updates chan<- NodeFiles) error StartFiles(updates chan<- NodeFiles) error
StopFiles() StopFiles()
@@ -98,10 +99,12 @@ const (
) )
type node struct { type node struct {
id string
address string address string
ips []string ips []string
peer client.RestClient peer client.RestClient
peerErr error
peerLock sync.RWMutex peerLock sync.RWMutex
peerWg sync.WaitGroup peerWg sync.WaitGroup
disconnect context.CancelFunc disconnect context.CancelFunc
@@ -117,7 +120,7 @@ type node struct {
memLimit uint64 memLimit uint64
} }
config clientapi.ConfigV3 config *clientapi.ConfigV3
state nodeState state nodeState
latency float64 // Seconds latency float64 // Seconds
@@ -138,17 +141,60 @@ type node struct {
srtAddress *url.URL srtAddress *url.URL
} }
func NewNode(address string) Node { func NewNode(id, address string) Node {
n := &node{ n := &node{
id: id,
address: address, address: address,
state: stateDisconnected, state: stateDisconnected,
secure: strings.HasPrefix(address, "https://"), secure: strings.HasPrefix(address, "https://"),
} }
n.resources.throttling = true
n.resources.cpu = 100
n.resources.ncpu = 1
n.resources.cpuLimit = 100
n.resources.mem = 0
n.resources.memLimit = 0
ctx, cancel := context.WithCancel(context.Background())
n.disconnect = cancel
err := n.connect(ctx)
if err != nil {
n.peerErr = err
go func(ctx context.Context) {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
err := n.connect(ctx)
if err == nil {
n.peerErr = nil
return
} else {
n.peerErr = err
}
case <-ctx.Done():
return
}
}
}(ctx)
}
return n return n
} }
func (n *node) Connect() error { func (n *node) SetAddress(address string) {
n.peerLock.Lock()
defer n.peerLock.Unlock()
n.address = address
}
func (n *node) connect(ctx context.Context) error {
n.peerLock.Lock() n.peerLock.Lock()
defer n.peerLock.Unlock() defer n.peerLock.Unlock()
@@ -156,6 +202,10 @@ func (n *node) Connect() error {
return nil return nil
} }
if len(n.address) == 0 {
return fmt.Errorf("no address provided")
}
u, err := url.Parse(n.address) u, err := url.Parse(n.address)
if err != nil { if err != nil {
return fmt.Errorf("invalid address: %w", err) return fmt.Errorf("invalid address: %w", err)
@@ -195,7 +245,7 @@ func (n *node) Connect() error {
return fmt.Errorf("failed to convert config to expected version") return fmt.Errorf("failed to convert config to expected version")
} }
n.config = config n.config = &config
n.httpAddress = u n.httpAddress = u
@@ -237,129 +287,165 @@ func (n *node) Connect() error {
n.peer = peer n.peer = peer
ctx, cancel := context.WithCancel(context.Background())
n.disconnect = cancel
n.peerWg.Add(2) n.peerWg.Add(2)
go func(ctx context.Context) { go n.pingPeer(ctx, &n.peerWg)
ticker := time.NewTicker(time.Second) go n.updateResources(ctx, &n.peerWg)
defer ticker.Stop()
defer n.peerWg.Done()
for {
select {
case <-ticker.C:
// Ping
latency, err := n.Ping()
n.stateLock.Lock()
if err != nil {
n.state = stateDisconnected
} else {
n.lastContact = time.Now()
n.state = stateConnected
}
n.latency = n.latency*0.2 + latency.Seconds()*0.8
n.stateLock.Unlock()
case <-ctx.Done():
return
}
}
}(ctx)
go func(ctx context.Context) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
defer n.peerWg.Done()
for {
select {
case <-ticker.C:
// Metrics
metrics, err := n.peer.Metrics(clientapi.MetricsQuery{
Metrics: []clientapi.MetricsQueryMetric{
{Name: "cpu_ncpu"},
{Name: "cpu_idle"},
{Name: "cpu_limit"},
{Name: "cpu_throttling"},
{Name: "mem_total"},
{Name: "mem_free"},
{Name: "mem_limit"},
{Name: "mem_throttling"},
},
})
if err != nil {
n.stateLock.Lock()
n.resources.throttling = true
n.resources.cpu = 100
n.resources.ncpu = 1
n.resources.cpuLimit = 100
n.resources.mem = 0
n.resources.memLimit = 0
n.state = stateDisconnected
n.stateLock.Unlock()
continue
}
cpu_ncpu := .0
cpu_idle := .0
cpu_limit := .0
mem_total := uint64(0)
mem_free := uint64(0)
mem_limit := uint64(0)
throttling := .0
for _, x := range metrics.Metrics {
if x.Name == "cpu_idle" {
cpu_idle = x.Values[0].Value
} else if x.Name == "cpu_ncpu" {
cpu_ncpu = x.Values[0].Value
} else if x.Name == "cpu_limit" {
cpu_limit = x.Values[0].Value
} else if x.Name == "cpu_throttling" {
throttling += x.Values[0].Value
} else if x.Name == "mem_total" {
mem_total = uint64(x.Values[0].Value)
} else if x.Name == "mem_free" {
mem_free = uint64(x.Values[0].Value)
} else if x.Name == "mem_limit" {
mem_limit = uint64(x.Values[0].Value)
} else if x.Name == "mem_throttling" {
throttling += x.Values[0].Value
}
}
n.stateLock.Lock()
if throttling > 0 {
n.resources.throttling = true
} else {
n.resources.throttling = false
}
n.resources.ncpu = cpu_ncpu
n.resources.cpu = (100 - cpu_idle) * cpu_ncpu
n.resources.cpuLimit = cpu_limit * cpu_ncpu
if mem_total != 0 {
n.resources.mem = mem_total - mem_free
n.resources.memLimit = mem_limit
} else {
n.resources.mem = 0
n.resources.memLimit = 0
}
n.lastContact = time.Now()
n.stateLock.Unlock()
case <-ctx.Done():
return
}
}
}(ctx)
return nil return nil
} }
func (n *node) Config() clientapi.ConfigV3 { func (n *node) IsConnected() (bool, error) {
n.peerLock.RLock()
defer n.peerLock.RUnlock()
if n.peer == nil {
fmt.Printf("\n***** n.peer is nil\n")
return false, fmt.Errorf("not connected: %w", n.peerErr)
}
return true, nil
}
func (n *node) Disconnect() {
n.peerLock.Lock()
if n.disconnect != nil {
n.disconnect()
n.disconnect = nil
}
n.peerLock.Unlock()
n.peerWg.Wait()
n.peerLock.Lock()
n.peer = nil
n.peerErr = fmt.Errorf("disconnected")
n.peerLock.Unlock()
}
func (n *node) pingPeer(ctx context.Context, wg *sync.WaitGroup) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
defer wg.Done()
for {
select {
case <-ticker.C:
// Ping
latency, err := n.Ping()
n.peerLock.Lock()
n.peerErr = err
n.peerLock.Unlock()
n.stateLock.Lock()
if err != nil {
n.state = stateDisconnected
} else {
n.lastContact = time.Now()
n.state = stateConnected
}
n.latency = n.latency*0.2 + latency.Seconds()*0.8
n.stateLock.Unlock()
case <-ctx.Done():
return
}
}
}
func (n *node) updateResources(ctx context.Context, wg *sync.WaitGroup) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
defer wg.Done()
for {
select {
case <-ticker.C:
// Metrics
metrics, err := n.peer.Metrics(clientapi.MetricsQuery{
Metrics: []clientapi.MetricsQueryMetric{
{Name: "cpu_ncpu"},
{Name: "cpu_idle"},
{Name: "cpu_limit"},
{Name: "cpu_throttling"},
{Name: "mem_total"},
{Name: "mem_free"},
{Name: "mem_limit"},
{Name: "mem_throttling"},
},
})
n.peerLock.Lock()
n.peerErr = err
n.peerLock.Unlock()
if err != nil {
n.stateLock.Lock()
n.resources.throttling = true
n.resources.cpu = 100
n.resources.ncpu = 1
n.resources.cpuLimit = 100
n.resources.mem = 0
n.resources.memLimit = 0
n.state = stateDisconnected
n.stateLock.Unlock()
continue
}
cpu_ncpu := .0
cpu_idle := .0
cpu_limit := .0
mem_total := uint64(0)
mem_free := uint64(0)
mem_limit := uint64(0)
throttling := .0
for _, x := range metrics.Metrics {
if x.Name == "cpu_idle" {
cpu_idle = x.Values[0].Value
} else if x.Name == "cpu_ncpu" {
cpu_ncpu = x.Values[0].Value
} else if x.Name == "cpu_limit" {
cpu_limit = x.Values[0].Value
} else if x.Name == "cpu_throttling" {
throttling += x.Values[0].Value
} else if x.Name == "mem_total" {
mem_total = uint64(x.Values[0].Value)
} else if x.Name == "mem_free" {
mem_free = uint64(x.Values[0].Value)
} else if x.Name == "mem_limit" {
mem_limit = uint64(x.Values[0].Value)
} else if x.Name == "mem_throttling" {
throttling += x.Values[0].Value
}
}
n.stateLock.Lock()
if throttling > 0 {
n.resources.throttling = true
} else {
n.resources.throttling = false
}
n.resources.ncpu = cpu_ncpu
n.resources.cpu = (100 - cpu_idle) * cpu_ncpu
n.resources.cpuLimit = cpu_limit * cpu_ncpu
if mem_total != 0 {
n.resources.mem = mem_total - mem_free
n.resources.memLimit = mem_limit
} else {
n.resources.mem = 0
n.resources.memLimit = 0
}
n.lastContact = time.Now()
n.stateLock.Unlock()
case <-ctx.Done():
return
}
}
}
func (n *node) Config() *clientapi.ConfigV3 {
return n.config return n.config
} }
@@ -371,12 +457,7 @@ func (n *node) Ping() (time.Duration, error) {
return 0, fmt.Errorf("not connected") return 0, fmt.Errorf("not connected")
} }
ok, latency := n.peer.Ping() latency, err := n.peer.Ping()
var err error = nil
if !ok {
err = fmt.Errorf("not connected")
}
return latency, err return latency, err
} }
@@ -400,22 +481,7 @@ func (n *node) AboutPeer() (clientapi.About, error) {
return clientapi.About{}, fmt.Errorf("not connected") return clientapi.About{}, fmt.Errorf("not connected")
} }
return n.peer.About(), nil return n.peer.About(false)
}
func (n *node) Disconnect() {
n.peerLock.Lock()
if n.disconnect != nil {
n.disconnect()
n.disconnect = nil
}
n.peerLock.Unlock()
n.peerWg.Wait()
n.peerLock.Lock()
n.peer = nil
n.peerLock.Unlock()
} }
func (n *node) StartFiles(updates chan<- NodeFiles) error { func (n *node) StartFiles(updates chan<- NodeFiles) error {
@@ -471,7 +537,9 @@ func (n *node) About() NodeAbout {
about, err := n.AboutPeer() about, err := n.AboutPeer()
if err != nil { if err != nil {
return NodeAbout{ return NodeAbout{
State: stateDisconnected.String(), ID: n.id,
Address: n.address,
State: stateDisconnected.String(),
} }
} }
@@ -489,7 +557,7 @@ func (n *node) About() NodeAbout {
} }
nodeAbout := NodeAbout{ nodeAbout := NodeAbout{
ID: about.ID, ID: n.id,
Name: about.Name, Name: about.Name,
Address: n.address, Address: n.address,
State: state.String(), State: state.String(),

View File

@@ -249,9 +249,9 @@ func (p *proxy) Resources() map[string]NodeResources {
func (p *proxy) AddNode(id string, node Node) (string, error) { func (p *proxy) AddNode(id string, node Node) (string, error) {
about := node.About() about := node.About()
if id != about.ID { //if id != about.ID {
return "", fmt.Errorf("the provided (%s) and retrieved (%s) ID's don't match", id, about.ID) // return "", fmt.Errorf("the provided (%s) and retrieved (%s) ID's don't match", id, about.ID)
} //}
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()

View File

@@ -287,9 +287,9 @@ func (d *Config) init() {
d.vars.Register(value.NewBool(&d.Cluster.Debug, false), "cluster.debug", "CORE_CLUSTER_DEBUG", nil, "Switch to debug mode, not for production", false, false) d.vars.Register(value.NewBool(&d.Cluster.Debug, false), "cluster.debug", "CORE_CLUSTER_DEBUG", nil, "Switch to debug mode, not for production", false, false)
d.vars.Register(value.NewClusterAddress(&d.Cluster.Address, "127.0.0.1:8000"), "cluster.address", "CORE_CLUSTER_ADDRESS", nil, "Raft listen address", true, false) d.vars.Register(value.NewClusterAddress(&d.Cluster.Address, "127.0.0.1:8000"), "cluster.address", "CORE_CLUSTER_ADDRESS", nil, "Raft listen address", true, false)
d.vars.Register(value.NewClusterPeerList(&d.Cluster.Peers, []string{""}, ","), "cluster.peers", "CORE_CLUSTER_PEERS", nil, "Raft addresses of cores that are part of the cluster", false, false) d.vars.Register(value.NewClusterPeerList(&d.Cluster.Peers, []string{""}, ","), "cluster.peers", "CORE_CLUSTER_PEERS", nil, "Raft addresses of cores that are part of the cluster", false, false)
d.vars.Register(value.NewInt64(&d.Cluster.SyncInterval, 5), "cluster.sync_interval_sec", "CORE_CLUSTER_SYNC_INTERVAL", nil, "Interval between aligning the process in the cluster DB with the processes on the nodes", true, false) d.vars.Register(value.NewInt64(&d.Cluster.SyncInterval, 5), "cluster.sync_interval_sec", "CORE_CLUSTER_SYNC_INTERVAL_SEC", nil, "Interval between aligning the process in the cluster DB with the processes on the nodes", true, false)
d.vars.Register(value.NewInt64(&d.Cluster.NodeRecoverTimeout, 120), "cluster.node_recover_timeout_sec", "CORE_CLUSTER_NODE_RECOVER_TIMEOUT", nil, "Timeout for a node to recover before rebalancing the processes", true, false) d.vars.Register(value.NewInt64(&d.Cluster.NodeRecoverTimeout, 120), "cluster.node_recover_timeout_sec", "CORE_CLUSTER_NODE_RECOVER_TIMEOUT_SEC", nil, "Timeout for a node to recover before rebalancing the processes", true, false)
d.vars.Register(value.NewInt64(&d.Cluster.EmergencyLeaderTimeout, 10), "cluster.emergency_leader_timeout_sec", "CORE_CLUSTER_EMERGENCY_LEADER_TIMEOUT", nil, "Timeout for establishing the emergency leadership after lost contact to raft leader", true, false) d.vars.Register(value.NewInt64(&d.Cluster.EmergencyLeaderTimeout, 10), "cluster.emergency_leader_timeout_sec", "CORE_CLUSTER_EMERGENCY_LEADER_TIMEOUT_SEC", nil, "Timeout for establishing the emergency leadership after lost contact to raft leader", true, false)
} }
// Validate validates the current state of the Config for completeness and sanity. Errors are // Validate validates the current state of the Config for completeness and sanity. Errors are

2
go.mod
View File

@@ -9,7 +9,7 @@ require (
github.com/atrox/haikunatorgo/v2 v2.0.1 github.com/atrox/haikunatorgo/v2 v2.0.1
github.com/caddyserver/certmagic v0.17.2 github.com/caddyserver/certmagic v0.17.2
github.com/casbin/casbin/v2 v2.69.1 github.com/casbin/casbin/v2 v2.69.1
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230605095314-42546fbbbece github.com/datarhei/core-client-go/v16 v16.11.1-0.20230614141756-a25a5fc3c60e
github.com/datarhei/gosrt v0.4.1 github.com/datarhei/gosrt v0.4.1
github.com/datarhei/joy4 v0.0.0-20230505074825-fde05957445a github.com/datarhei/joy4 v0.0.0-20230505074825-fde05957445a
github.com/fujiwara/shapeio v1.0.0 github.com/fujiwara/shapeio v1.0.0

12
go.sum
View File

@@ -8,6 +8,8 @@ github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc
github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE= github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE=
github.com/Masterminds/semver/v3 v3.2.1 h1:RN9w6+7QoMeJVGyfmbcgs28Br8cvmnucEXnY0rYXWg0= github.com/Masterminds/semver/v3 v3.2.1 h1:RN9w6+7QoMeJVGyfmbcgs28Br8cvmnucEXnY0rYXWg0=
github.com/Masterminds/semver/v3 v3.2.1/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ= github.com/Masterminds/semver/v3 v3.2.1/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ=
github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/adhocore/gronx v1.1.2 h1:Hgm+d8SyGtn+rCoDkxZq3nLNFLLkzRGR7L2ziRRD1w8= github.com/adhocore/gronx v1.1.2 h1:Hgm+d8SyGtn+rCoDkxZq3nLNFLLkzRGR7L2ziRRD1w8=
github.com/adhocore/gronx v1.1.2/go.mod h1:7oUY1WAU8rEJWmAxXR2DN0JaO4gi9khSgKjiRypqteg= github.com/adhocore/gronx v1.1.2/go.mod h1:7oUY1WAU8rEJWmAxXR2DN0JaO4gi9khSgKjiRypqteg=
github.com/agnivade/levenshtein v1.0.1/go.mod h1:CURSv5d9Uaml+FovSIICkLbAUZ9S4RqaHDIsdSBg7lM= github.com/agnivade/levenshtein v1.0.1/go.mod h1:CURSv5d9Uaml+FovSIICkLbAUZ9S4RqaHDIsdSBg7lM=
@@ -27,6 +29,7 @@ github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+
github.com/atrox/haikunatorgo/v2 v2.0.1 h1:FCVx2KL2YvZtI1rI9WeEHxeLRrKGr0Dd4wfCJiUXupc= github.com/atrox/haikunatorgo/v2 v2.0.1 h1:FCVx2KL2YvZtI1rI9WeEHxeLRrKGr0Dd4wfCJiUXupc=
github.com/atrox/haikunatorgo/v2 v2.0.1/go.mod h1:BBQmx2o+1Z5poziaHRgddAZKOpijwfKdAmMnSYlFK70= github.com/atrox/haikunatorgo/v2 v2.0.1/go.mod h1:BBQmx2o+1Z5poziaHRgddAZKOpijwfKdAmMnSYlFK70=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c h1:8XZeJrs4+ZYhJeJ2aZxADI2tGADS15AzIF8MQ8XAhT4= github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c h1:8XZeJrs4+ZYhJeJ2aZxADI2tGADS15AzIF8MQ8XAhT4=
github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c/go.mod h1:x1vxHcL/9AVzuk5HOloOEPrtJY0MaalYr78afXZ+pWI= github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c/go.mod h1:x1vxHcL/9AVzuk5HOloOEPrtJY0MaalYr78afXZ+pWI=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
@@ -49,6 +52,10 @@ github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46t
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230605095314-42546fbbbece h1:Gv+W986jLcMa/TOKg5YF3RMDlNDDyj7uHuH+mHP7xq8= github.com/datarhei/core-client-go/v16 v16.11.1-0.20230605095314-42546fbbbece h1:Gv+W986jLcMa/TOKg5YF3RMDlNDDyj7uHuH+mHP7xq8=
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230605095314-42546fbbbece/go.mod h1:6L0zr/NUwvaPsCTK/IL17m8JUEtgLp3BDtlsBREwacg= github.com/datarhei/core-client-go/v16 v16.11.1-0.20230605095314-42546fbbbece/go.mod h1:6L0zr/NUwvaPsCTK/IL17m8JUEtgLp3BDtlsBREwacg=
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230614130211-fb0f92af8ac9 h1:ntM1tymajXx92ydwi6RSiDG54aQV3cMOtlGRBT6p9Z8=
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230614130211-fb0f92af8ac9/go.mod h1:6L0zr/NUwvaPsCTK/IL17m8JUEtgLp3BDtlsBREwacg=
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230614141756-a25a5fc3c60e h1:iQKqGTyIdCyO7kY/G5MCKhzt3xZ5YPRubbJskVp5EvQ=
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230614141756-a25a5fc3c60e/go.mod h1:6L0zr/NUwvaPsCTK/IL17m8JUEtgLp3BDtlsBREwacg=
github.com/datarhei/gosrt v0.4.1 h1:08km3wKy72jOdC+JzBDWN57H7xST4mz5lFeJQHuWmMs= github.com/datarhei/gosrt v0.4.1 h1:08km3wKy72jOdC+JzBDWN57H7xST4mz5lFeJQHuWmMs=
github.com/datarhei/gosrt v0.4.1/go.mod h1:FtsulRiUc67Oi3Ii9JH9aQkpO+ZfgeauRAtIE40mIVA= github.com/datarhei/gosrt v0.4.1/go.mod h1:FtsulRiUc67Oi3Ii9JH9aQkpO+ZfgeauRAtIE40mIVA=
github.com/datarhei/joy4 v0.0.0-20230505074825-fde05957445a h1:Tf4DSHY1xruBglr+yYP5Wct7czM86GKMYgbXH8a7OFo= github.com/datarhei/joy4 v0.0.0-20230505074825-fde05957445a h1:Tf4DSHY1xruBglr+yYP5Wct7czM86GKMYgbXH8a7OFo=
@@ -72,6 +79,7 @@ github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
@@ -154,11 +162,13 @@ github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI= github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI=
github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
@@ -228,6 +238,7 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
@@ -280,6 +291,7 @@ github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFt
github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ=
github.com/shoenig/test v0.6.3/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= github.com/shoenig/test v0.6.3/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k=
github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU= github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.9.2 h1:oxx1eChJGI6Uks2ZC4W1zpLlVgqB8ner4EuQwV4Ik1Y= github.com/sirupsen/logrus v1.9.2 h1:oxx1eChJGI6Uks2ZC4W1zpLlVgqB8ner4EuQwV4Ik1Y=

View File

@@ -17,7 +17,6 @@ type ClusterNode struct {
Latency float64 `json:"latency_ms"` // milliseconds Latency float64 `json:"latency_ms"` // milliseconds
State string `json:"state"` State string `json:"state"`
Resources ClusterNodeResources `json:"resources"` Resources ClusterNodeResources `json:"resources"`
Version string `json:"version"`
} }
func (n *ClusterNode) Marshal(about proxy.NodeAbout) { func (n *ClusterNode) Marshal(about proxy.NodeAbout) {
@@ -73,6 +72,7 @@ type ClusterAbout struct {
Nodes []ClusterNode `json:"nodes"` Nodes []ClusterNode `json:"nodes"`
Version string `json:"version"` Version string `json:"version"`
Degraded bool `json:"degraded"` Degraded bool `json:"degraded"`
DegradedErr string `json:"degraded_error"`
} }
type ClusterProcess struct { type ClusterProcess struct {

View File

@@ -77,6 +77,10 @@ func (h *ClusterHandler) About(c echo.Context) error {
Degraded: state.Degraded, Degraded: state.Degraded,
} }
if state.DegradedErr != nil {
about.DegradedErr = state.DegradedErr.Error()
}
for _, n := range state.Raft.Server { for _, n := range state.Raft.Server {
about.Raft.Server = append(about.Raft.Server, api.ClusterRaftServer{ about.Raft.Server = append(about.Raft.Server, api.ClusterRaftServer{
ID: n.ID, ID: n.ID,

View File

@@ -407,8 +407,9 @@ type ConfigV3 struct {
} `json:"output"` } `json:"output"`
} `json:"access"` } `json:"access"`
Log struct { Log struct {
MaxLines int `json:"max_lines" format:"int"` MaxLines int `json:"max_lines" format:"int"`
MaxHistory int `json:"max_history" format:"int"` MaxHistory int `json:"max_history" format:"int"`
MaxMinimalHistory int `json:"max_minimal_history" format:"int"`
} `json:"log"` } `json:"log"`
} `json:"ffmpeg"` } `json:"ffmpeg"`
Playout struct { Playout struct {
@@ -417,9 +418,10 @@ type ConfigV3 struct {
MaxPort int `json:"max_port" format:"int"` MaxPort int `json:"max_port" format:"int"`
} `json:"playout"` } `json:"playout"`
Debug struct { Debug struct {
Profiling bool `json:"profiling"` Profiling bool `json:"profiling"`
ForceGC int `json:"force_gc" format:"int"` ForceGC int `json:"force_gc" format:"int"` // deprecated, use MemoryLimit instead
MemoryLimit int64 `json:"memory_limit_mbytes" format:"int64"` MemoryLimit int64 `json:"memory_limit_mbytes" format:"int64"`
AutoMaxProcs bool `json:"auto_max_procs"`
} `json:"debug"` } `json:"debug"`
Metrics struct { Metrics struct {
Enable bool `json:"enable"` Enable bool `json:"enable"`
@@ -447,16 +449,17 @@ type ConfigV3 struct {
UIPath string `json:"ui_path"` UIPath string `json:"ui_path"`
} `json:"router"` } `json:"router"`
Resources struct { Resources struct {
MaxCPUUsage float64 `json:"max_cpu_usage"` MaxCPUUsage float64 `json:"max_cpu_usage"` // percent 0-100
MaxMemoryUsage float64 `json:"max_memory_usage"` MaxMemoryUsage float64 `json:"max_memory_usage"` // percent 0-100
} `json:"resources"` } `json:"resources"`
Cluster struct { Cluster struct {
Enable bool `json:"enable"` Enable bool `json:"enable"`
Bootstrap bool `json:"bootstrap"` Debug bool `json:"debug"`
Recover bool `json:"recover"` Address string `json:"address"` // ip:port
Debug bool `json:"debug"` Peers []string `json:"peers"`
Address string `json:"address"` SyncInterval int64 `json:"sync_interval_sec" format:"int64"` // seconds
Peers []string `json:"peers"` NodeRecoverTimeout int64 `json:"node_recover_timeout_sec" format:"int64"` // seconds
EmergencyLeaderTimeout int64 `json:"emergency_leader_timeout_sec" format:"int64"` // seconds
} `json:"cluster"` } `json:"cluster"`
} }

View File

@@ -8,6 +8,7 @@ import (
"net/http" "net/http"
"net/url" "net/url"
"strings" "strings"
"sync"
"time" "time"
"github.com/datarhei/core-client-go/v16/api" "github.com/datarhei/core-client-go/v16/api"
@@ -38,13 +39,13 @@ type RestClient interface {
// Address returns the address of the connected datarhei Core // Address returns the address of the connected datarhei Core
Address() string Address() string
Ping() (bool, time.Duration) Ping() (time.Duration, error)
About() api.About // GET / About(cached bool) (api.About, error) // GET /
Config() (int64, api.Config, error) // GET /config Config() (int64, api.Config, error) // GET /v3/config
ConfigSet(config interface{}) error // POST /config ConfigSet(config interface{}) error // POST /v3/config
ConfigReload() error // GET /config/reload ConfigReload() error // GET /v3/config/reload
Graph(query api.GraphQuery) (api.GraphResponse, error) // POST /graph Graph(query api.GraphQuery) (api.GraphResponse, error) // POST /graph
@@ -66,7 +67,7 @@ type RestClient interface {
FilesystemDeleteFile(name, path string) error // DELETE /v3/fs/{name}/{path} FilesystemDeleteFile(name, path string) error // DELETE /v3/fs/{name}/{path}
FilesystemAddFile(name, path string, data io.Reader) error // PUT /v3/fs/{name}/{path} FilesystemAddFile(name, path string, data io.Reader) error // PUT /v3/fs/{name}/{path}
Log() ([]api.LogEvent, error) // GET /log Log() ([]api.LogEvent, error) // GET /v3/log
Metadata(key string) (api.Metadata, error) // GET /v3/metadata/{key} Metadata(key string) (api.Metadata, error) // GET /v3/metadata/{key}
MetadataSet(key string, metadata api.Metadata) error // PUT /v3/metadata/{key} MetadataSet(key string, metadata api.Metadata) error // PUT /v3/metadata/{key}
@@ -130,6 +131,7 @@ type restclient struct {
auth0Token string auth0Token string
client HTTPClient client HTTPClient
about api.About about api.About
aboutLock sync.RWMutex
version struct { version struct {
connectedCore *semver.Version connectedCore *semver.Version
@@ -152,42 +154,29 @@ func New(config Config) (RestClient, error) {
} }
u, err := url.Parse(r.address) u, err := url.Parse(r.address)
if err != nil { if err == nil {
return nil, err username := u.User.Username()
if len(username) != 0 {
r.username = username
}
if password, ok := u.User.Password(); ok {
r.password = password
}
u.User = nil
u.RawQuery = ""
u.Fragment = ""
r.address = u.String()
} }
username := u.User.Username()
if len(username) != 0 {
r.username = username
}
if password, ok := u.User.Password(); ok {
r.password = password
}
u.User = nil
u.RawQuery = ""
u.Fragment = ""
r.address = u.String()
if r.client == nil { if r.client == nil {
r.client = &http.Client{ r.client = &http.Client{
Timeout: 15 * time.Second, Timeout: 15 * time.Second,
} }
} }
about, err := r.info()
if err != nil {
return nil, err
}
r.about = about
if r.about.App != coreapp {
return nil, fmt.Errorf("didn't receive the expected API response (got: %s, want: %s)", r.about.Name, coreapp)
}
mustNewConstraint := func(constraint string) *semver.Constraints { mustNewConstraint := func(constraint string) *semver.Constraints {
v, _ := semver.NewConstraint(constraint) v, _ := semver.NewConstraint(constraint)
return v return v
@@ -198,20 +187,35 @@ func New(config Config) (RestClient, error) {
"GET/api/v3/metrics": mustNewConstraint("^16.10.0"), "GET/api/v3/metrics": mustNewConstraint("^16.10.0"),
} }
if len(r.about.ID) != 0 { about, err := r.info()
if err != nil {
return nil, err
}
if about.App != coreapp {
return nil, fmt.Errorf("didn't receive the expected API response (got: %s, want: %s)", about.Name, coreapp)
}
r.aboutLock.Lock()
r.about = about
r.aboutLock.Unlock()
if len(about.ID) != 0 {
c, _ := semver.NewConstraint(coreversion) c, _ := semver.NewConstraint(coreversion)
v, err := semver.NewVersion(r.about.Version.Number) v, err := semver.NewVersion(about.Version.Number)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if !c.Check(v) { if !c.Check(v) {
return nil, fmt.Errorf("the core version (%s) is not supported, because a version %s is required", r.about.Version.Number, coreversion) return nil, fmt.Errorf("the core version (%s) is not supported, because a version %s is required", about.Version.Number, coreversion)
} }
r.aboutLock.Lock()
r.version.connectedCore = v r.version.connectedCore = v
r.aboutLock.Unlock()
} else { } else {
v, err := semver.NewVersion(r.about.Version.Number) v, err := semver.NewVersion(about.Version.Number)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -228,11 +232,17 @@ func New(config Config) (RestClient, error) {
return r, nil return r, nil
} }
func (r restclient) String() string { func (r *restclient) String() string {
r.aboutLock.RLock()
defer r.aboutLock.RUnlock()
return fmt.Sprintf("%s %s (%s) %s @ %s", r.about.Name, r.about.Version.Number, r.about.Version.Arch, r.about.ID, r.address) return fmt.Sprintf("%s %s (%s) %s @ %s", r.about.Name, r.about.Version.Number, r.about.Version.Arch, r.about.ID, r.address)
} }
func (r *restclient) ID() string { func (r *restclient) ID() string {
r.aboutLock.RLock()
defer r.aboutLock.RUnlock()
return r.about.ID return r.about.ID
} }
@@ -244,30 +254,43 @@ func (r *restclient) Address() string {
return r.address return r.address
} }
func (r *restclient) About() api.About { func (r *restclient) About(cached bool) (api.About, error) {
return r.about if cached {
return r.about, nil
}
about, err := r.info()
if err != nil {
return api.About{}, err
}
r.about = about
return about, nil
} }
func (r *restclient) Ping() (bool, time.Duration) { func (r *restclient) Ping() (time.Duration, error) {
req, err := http.NewRequest(http.MethodGet, r.address+"/ping", nil) req, err := http.NewRequest(http.MethodGet, r.address+"/ping", nil)
if err != nil { if err != nil {
return false, time.Duration(0) return time.Duration(0), err
} }
start := time.Now() start := time.Now()
status, body, err := r.request(req) status, body, err := r.request(req)
if err != nil { if err != nil {
return false, time.Since(start) return time.Duration(0), err
} }
defer body.Close() defer body.Close()
if status != 200 { if status != 200 {
return false, time.Since(start) return time.Duration(0), err
} }
return true, time.Since(start) io.ReadAll(body)
return time.Since(start), nil
} }
func (r *restclient) login() error { func (r *restclient) login() error {
@@ -363,8 +386,10 @@ func (r *restclient) login() error {
return fmt.Errorf("the core version (%s) is not supported, because a version %s is required", about.Version.Number, coreversion) return fmt.Errorf("the core version (%s) is not supported, because a version %s is required", about.Version.Number, coreversion)
} }
r.aboutLock.Lock()
r.version.connectedCore = v r.version.connectedCore = v
r.about = about r.about = about
r.aboutLock.Unlock()
return nil return nil
} }
@@ -375,6 +400,9 @@ func (r *restclient) checkVersion(method, path string) error {
return nil return nil
} }
r.aboutLock.RLock()
defer r.aboutLock.RUnlock()
if !c.Check(r.version.connectedCore) { if !c.Check(r.version.connectedCore) {
return fmt.Errorf("this method is only available in version %s of the core", c.String()) return fmt.Errorf("this method is only available in version %s of the core", c.String())
} }
@@ -383,6 +411,10 @@ func (r *restclient) checkVersion(method, path string) error {
} }
func (r *restclient) refresh() error { func (r *restclient) refresh() error {
if len(r.refreshToken) == 0 {
return fmt.Errorf("no refresh token defined")
}
req, err := http.NewRequest("GET", r.address+r.prefix+"/login/refresh", nil) req, err := http.NewRequest("GET", r.address+r.prefix+"/login/refresh", nil)
if err != nil { if err != nil {
return err return err

2
vendor/modules.txt vendored
View File

@@ -78,7 +78,7 @@ github.com/cespare/xxhash/v2
# github.com/cpuguy83/go-md2man/v2 v2.0.2 # github.com/cpuguy83/go-md2man/v2 v2.0.2
## explicit; go 1.11 ## explicit; go 1.11
github.com/cpuguy83/go-md2man/v2/md2man github.com/cpuguy83/go-md2man/v2/md2man
# github.com/datarhei/core-client-go/v16 v16.11.1-0.20230605095314-42546fbbbece # github.com/datarhei/core-client-go/v16 v16.11.1-0.20230614141756-a25a5fc3c60e
## explicit; go 1.18 ## explicit; go 1.18
github.com/datarhei/core-client-go/v16 github.com/datarhei/core-client-go/v16
github.com/datarhei/core-client-go/v16/api github.com/datarhei/core-client-go/v16/api