From a728cc7839a99def9c0e9a436c8c9c58288790ff Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Wed, 14 Jun 2023 22:05:59 +0200 Subject: [PATCH] Fix clusterNode/proxyNode --- app/api/api.go | 16 +- cluster/cluster.go | 70 ++-- cluster/leader.go | 2 +- cluster/node.go | 145 +++++-- cluster/proxy/node.go | 362 +++++++++++------- cluster/proxy/proxy.go | 6 +- config/config.go | 6 +- go.mod | 2 +- go.sum | 12 + http/api/cluster.go | 2 +- http/handler/api/cluster.go | 4 + .../datarhei/core-client-go/v16/api/config.go | 29 +- .../datarhei/core-client-go/v16/client.go | 124 +++--- vendor/modules.txt | 2 +- 14 files changed, 494 insertions(+), 288 deletions(-) diff --git a/app/api/api.go b/app/api/api.go index 65f0e27c..ee9921d8 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -873,11 +873,17 @@ func (a *api) start() error { if len(config.Owner) == 0 { identity = a.iam.GetDefaultVerifier() } else { - identity, _ = a.iam.GetVerifier(config.Owner) + var err error = nil + identity, err = a.iam.GetVerifier(config.Owner) + if err != nil { + identity = nil + } } if identity != nil { u.User = url.UserPassword(config.Owner, identity.GetServiceBasicAuth()) + } else { + u.User = url.User(config.Owner) } return u.String() @@ -891,11 +897,17 @@ func (a *api) start() error { if len(config.Owner) == 0 { identity = a.iam.GetDefaultVerifier() } else { - identity, _ = a.iam.GetVerifier(config.Owner) + var err error = nil + identity, err = a.iam.GetVerifier(config.Owner) + if err != nil { + identity = nil + } } if identity != nil { u.User = url.UserPassword(config.Owner, identity.GetServiceBasicAuth()) + } else { + u.User = url.User(config.Owner) } return u.String() diff --git a/cluster/cluster.go b/cluster/cluster.go index 539f17ec..dfe108d2 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -398,11 +398,11 @@ func (c *cluster) IsRaftLeader() bool { return c.isRaftLeader } -func (c *cluster) IsDegraded() bool { +func (c *cluster) IsDegraded() (bool, error) { c.stateLock.Lock() defer c.stateLock.Unlock() - return c.isDegraded + return c.isDegraded, c.isDegradedErr } func (c *cluster) Leave(origin, id string) error { @@ -608,7 +608,7 @@ func (c *cluster) Snapshot() (io.ReadCloser, error) { } func (c *cluster) trackNodeChanges() { - ticker := time.NewTicker(time.Second) + ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for { @@ -629,7 +629,7 @@ func (c *cluster) trackNodeChanges() { } for _, server := range servers { - id := server.ID + server.Address + id := server.ID _, ok := c.nodes[id] if !ok { @@ -643,18 +643,15 @@ func (c *cluster) trackNodeChanges() { logger.Warn().WithError(err).Log("Discovering cluster API address") } - cnode := NewClusterNode(address) + cnode := NewClusterNode(id, address) - if !verifyClusterVersion(cnode.Version()) { + if err := verifyClusterVersion(cnode.Version()); err != nil { logger.Warn().Log("Version mismatch. Cluster will end up in degraded mode") } - if err := verifyClusterConfig(c.config, cnode.Config()); err != nil { - logger.Warn().WithError(err).Log("Config mismatch. Cluster will end up in degraded mode") - } - if _, err := c.proxy.AddNode(id, cnode.Proxy()); err != nil { - c.logger.Warn().WithError(err).Log("Adding node") + logger.Warn().WithError(err).Log("Adding node") + cnode.Stop() continue } @@ -700,35 +697,38 @@ func (c *cluster) checkClusterNodes() error { defer c.nodesLock.RUnlock() for id, node := range c.nodes { - if status, statusErr := node.Status(); status == "offline" { - return fmt.Errorf("node %s is offline: %w", id, statusErr) + if status, err := node.Status(); status == "offline" { + return fmt.Errorf("node %s is offline: %w", id, err) } version := node.Version() - if !verifyClusterVersion(version) { - return fmt.Errorf("node %s has a different version: %s", id, version) + if err := verifyClusterVersion(version); err != nil { + return fmt.Errorf("node %s has a different version: %s: %w", id, version, err) } - config := node.Config() - if configErr := verifyClusterConfig(c.config, config); configErr != nil { - return fmt.Errorf("node %s has a different configuration: %w", id, configErr) + config, err := node.Config() + if err != nil { + return fmt.Errorf("node %s has no configuration available: %w", id, err) + } + if err := verifyClusterConfig(c.config, config); err != nil { + return fmt.Errorf("node %s has a different configuration: %w", id, err) } } return nil } -func verifyClusterVersion(v string) bool { +func verifyClusterVersion(v string) error { version, err := ParseClusterVersion(v) if err != nil { - return false + return fmt.Errorf("parsing version %s: %w", v, err) } if !Version.Equal(version) { - return false + return fmt.Errorf("version %s not equal to expected version %s", version.String(), Version.String()) } - return true + return nil } func verifyClusterConfig(local, remote *config.Config) error { @@ -806,7 +806,7 @@ func (c *cluster) GetProcess(id app.ProcessID) (store.Process, error) { } func (c *cluster) AddProcess(origin string, config *app.Config) error { - if c.IsDegraded() { + if ok, _ := c.IsDegraded(); ok { return ErrDegraded } @@ -825,7 +825,7 @@ func (c *cluster) AddProcess(origin string, config *app.Config) error { } func (c *cluster) RemoveProcess(origin string, id app.ProcessID) error { - if c.IsDegraded() { + if ok, _ := c.IsDegraded(); ok { return ErrDegraded } @@ -844,7 +844,7 @@ func (c *cluster) RemoveProcess(origin string, id app.ProcessID) error { } func (c *cluster) UpdateProcess(origin string, id app.ProcessID, config *app.Config) error { - if c.IsDegraded() { + if ok, _ := c.IsDegraded(); ok { return ErrDegraded } @@ -864,7 +864,7 @@ func (c *cluster) UpdateProcess(origin string, id app.ProcessID, config *app.Con } func (c *cluster) SetProcessMetadata(origin string, id app.ProcessID, key string, data interface{}) error { - if c.IsDegraded() { + if ok, _ := c.IsDegraded(); ok { return ErrDegraded } @@ -939,7 +939,7 @@ func (c *cluster) ListUserPolicies(name string) (time.Time, []iamaccess.Policy) } func (c *cluster) AddIdentity(origin string, identity iamidentity.User) error { - if c.IsDegraded() { + if ok, _ := c.IsDegraded(); ok { return ErrDegraded } @@ -958,7 +958,7 @@ func (c *cluster) AddIdentity(origin string, identity iamidentity.User) error { } func (c *cluster) UpdateIdentity(origin, name string, identity iamidentity.User) error { - if c.IsDegraded() { + if ok, _ := c.IsDegraded(); ok { return ErrDegraded } @@ -978,7 +978,7 @@ func (c *cluster) UpdateIdentity(origin, name string, identity iamidentity.User) } func (c *cluster) SetPolicies(origin, name string, policies []iamaccess.Policy) error { - if c.IsDegraded() { + if ok, _ := c.IsDegraded(); ok { return ErrDegraded } @@ -998,7 +998,7 @@ func (c *cluster) SetPolicies(origin, name string, policies []iamaccess.Policy) } func (c *cluster) RemoveIdentity(origin string, name string) error { - if c.IsDegraded() { + if ok, _ := c.IsDegraded(); ok { return ErrDegraded } @@ -1057,15 +1057,17 @@ type ClusterAbout struct { Nodes []proxy.NodeAbout Version ClusterVersion Degraded bool + DegradedErr error } func (c *cluster) About() (ClusterAbout, error) { - degraded := c.IsDegraded() + degraded, degradedErr := c.IsDegraded() about := ClusterAbout{ - ID: c.id, - Address: c.Address(), - Degraded: degraded, + ID: c.id, + Address: c.Address(), + Degraded: degraded, + DegradedErr: degradedErr, } if address, err := c.ClusterAPIAddress(""); err == nil { diff --git a/cluster/leader.go b/cluster/leader.go index 245e95fd..a868875c 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -334,7 +334,7 @@ func (c *cluster) startSynchronizeAndRebalance(ctx context.Context, interval tim return case <-ticker.C: if !emergency { - if c.IsDegraded() { + if ok, _ := c.IsDegraded(); ok { break } diff --git a/cluster/node.go b/cluster/node.go index 3dd8df90..2a79e393 100644 --- a/cluster/node.go +++ b/cluster/node.go @@ -10,11 +10,14 @@ import ( "github.com/datarhei/core/v16/cluster/client" "github.com/datarhei/core/v16/cluster/proxy" "github.com/datarhei/core/v16/config" + "github.com/datarhei/core/v16/config/copy" ) type clusterNode struct { client client.APIClient + id string + address string version string lastContact time.Time lastContactErr error @@ -26,13 +29,13 @@ type clusterNode struct { runLock sync.Mutex cancelPing context.CancelFunc - proxyLock sync.Mutex - proxyNode proxy.Node - proxyNodeErr error + proxyNode proxy.Node } -func NewClusterNode(address string) *clusterNode { +func NewClusterNode(id, address string) *clusterNode { n := &clusterNode{ + id: id, + address: address, version: "0.0.0", client: client.APIClient{ Address: address, @@ -49,18 +52,12 @@ func NewClusterNode(address string) *clusterNode { n.version = version - p := proxy.NewNode(address) - err = p.Connect() - if err == nil { - n.proxyNode = p - } else { - n.proxyNodeErr = err - } + n.start(id) return n } -func (n *clusterNode) Start() error { +func (n *clusterNode) start(id string) error { n.runLock.Lock() defer n.runLock.Unlock() @@ -74,33 +71,39 @@ func (n *clusterNode) Start() error { go n.ping(ctx) go n.pingCore(ctx) - address, _ := n.CoreAPIAddress() + n.lastCoreContactErr = fmt.Errorf("not started yet") + n.lastContactErr = fmt.Errorf("not started yet") + + address, err := n.CoreAPIAddress() + n.proxyNode = proxy.NewNode(id, address) - p := proxy.NewNode(address) - err := p.Connect() if err != nil { - n.proxyNodeErr = err - - go func(ctx context.Context, address string) { - ticker := time.NewTicker(3 * time.Second) + n.lastCoreContactErr = err + go func(ctx context.Context) { + ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: - p := proxy.NewNode(address) - err := p.Connect() + address, err := n.CoreAPIAddress() + n.pingLock.Lock() + if err == nil { + n.proxyNode.SetAddress(address) + n.lastCoreContactErr = nil + } else { + n.lastCoreContactErr = err + } + n.pingLock.Unlock() + if err == nil { - n.proxyNode = p return } case <-ctx.Done(): return } } - }(ctx, address) - } else { - n.proxyNode = p + }(ctx) } return nil @@ -153,8 +156,84 @@ func (n *clusterNode) LastContact() time.Time { return n.lastContact } -func (n *clusterNode) Config() *config.Config { - return nil +func (n *clusterNode) Config() (*config.Config, error) { + if n.proxyNode == nil { + return nil, fmt.Errorf("proxy not available") + } + + apiconfig := n.proxyNode.Config() + if apiconfig == nil { + return nil, fmt.Errorf("no config stored") + } + + config := config.New(nil) + + config.Version = apiconfig.Version + config.Version = apiconfig.Version + config.Version = apiconfig.Version + config.ID = apiconfig.ID + config.Name = apiconfig.Name + config.Address = apiconfig.Address + config.CheckForUpdates = apiconfig.CheckForUpdates + + config.Log = apiconfig.Log + config.DB = apiconfig.DB + config.Host = apiconfig.Host + config.API.ReadOnly = apiconfig.API.ReadOnly + config.API.Access = apiconfig.API.Access + config.API.Auth.Enable = apiconfig.API.Auth.Enable + config.API.Auth.DisableLocalhost = apiconfig.API.Auth.DisableLocalhost + config.API.Auth.Username = apiconfig.API.Auth.Username + config.API.Auth.Password = apiconfig.API.Auth.Password + config.API.Auth.JWT = apiconfig.API.Auth.JWT + config.TLS = apiconfig.TLS + config.Storage.MimeTypes = apiconfig.Storage.MimeTypes + config.Storage.Disk = apiconfig.Storage.Disk + config.Storage.Memory = apiconfig.Storage.Memory + config.Storage.CORS = apiconfig.Storage.CORS + config.RTMP = apiconfig.RTMP + config.SRT = apiconfig.SRT + config.FFmpeg = apiconfig.FFmpeg + config.Playout = apiconfig.Playout + config.Debug = apiconfig.Debug + config.Metrics = apiconfig.Metrics + config.Sessions = apiconfig.Sessions + config.Service = apiconfig.Service + config.Router = apiconfig.Router + config.Resources = apiconfig.Resources + config.Cluster = apiconfig.Cluster + + config.Log.Topics = copy.Slice(apiconfig.Log.Topics) + + config.Host.Name = copy.Slice(apiconfig.Host.Name) + + config.API.Access.HTTP.Allow = copy.Slice(apiconfig.API.Access.HTTP.Allow) + config.API.Access.HTTP.Block = copy.Slice(apiconfig.API.Access.HTTP.Block) + config.API.Access.HTTPS.Allow = copy.Slice(apiconfig.API.Access.HTTPS.Allow) + config.API.Access.HTTPS.Block = copy.Slice(apiconfig.API.Access.HTTPS.Block) + + //config.API.Auth.Auth0.Tenants = copy.TenantSlice(d.API.Auth.Auth0.Tenants) + + config.Storage.CORS.Origins = copy.Slice(apiconfig.Storage.CORS.Origins) + config.Storage.Disk.Cache.Types.Allow = copy.Slice(apiconfig.Storage.Disk.Cache.Types.Allow) + config.Storage.Disk.Cache.Types.Block = copy.Slice(apiconfig.Storage.Disk.Cache.Types.Block) + //config.Storage.S3 = copy.Slice(d.Storage.S3) + + config.FFmpeg.Access.Input.Allow = copy.Slice(apiconfig.FFmpeg.Access.Input.Allow) + config.FFmpeg.Access.Input.Block = copy.Slice(apiconfig.FFmpeg.Access.Input.Block) + config.FFmpeg.Access.Output.Allow = copy.Slice(apiconfig.FFmpeg.Access.Output.Allow) + config.FFmpeg.Access.Output.Block = copy.Slice(apiconfig.FFmpeg.Access.Output.Block) + + config.Sessions.IPIgnoreList = copy.Slice(apiconfig.Sessions.IPIgnoreList) + + config.SRT.Log.Topics = copy.Slice(apiconfig.SRT.Log.Topics) + + config.Router.BlockedPrefixes = copy.Slice(apiconfig.Router.BlockedPrefixes) + config.Router.Routes = copy.StringMap(apiconfig.Router.Routes) + + config.Cluster.Peers = copy.Slice(apiconfig.Cluster.Peers) + + return config, nil } func (n *clusterNode) CoreAPIAddress() (string, error) { @@ -198,22 +277,16 @@ func (n *clusterNode) pingCore(ctx context.Context) { for { select { case <-ticker.C: - var err error - n.proxyLock.Lock() - if n.proxyNode == nil { - err = fmt.Errorf("can't connect to core api: %w", n.proxyNodeErr) - } else { - _, err = n.proxyNode.Ping() - } - n.proxyLock.Unlock() + _, err := n.proxyNode.IsConnected() if err == nil { n.pingLock.Lock() n.lastCoreContact = time.Now() + n.lastCoreContactErr = nil n.pingLock.Unlock() } else { n.pingLock.Lock() - n.lastCoreContactErr = err + n.lastCoreContactErr = fmt.Errorf("not connected to core api: %w", err) n.pingLock.Unlock() } case <-ctx.Done(): diff --git a/cluster/proxy/node.go b/cluster/proxy/node.go index b8074398..e81b355a 100644 --- a/cluster/proxy/node.go +++ b/cluster/proxy/node.go @@ -19,10 +19,11 @@ import ( ) type Node interface { - Connect() error + SetAddress(address string) + IsConnected() (bool, error) Disconnect() - Config() clientapi.ConfigV3 + Config() *clientapi.ConfigV3 StartFiles(updates chan<- NodeFiles) error StopFiles() @@ -98,10 +99,12 @@ const ( ) type node struct { + id string address string ips []string peer client.RestClient + peerErr error peerLock sync.RWMutex peerWg sync.WaitGroup disconnect context.CancelFunc @@ -117,7 +120,7 @@ type node struct { memLimit uint64 } - config clientapi.ConfigV3 + config *clientapi.ConfigV3 state nodeState latency float64 // Seconds @@ -138,17 +141,60 @@ type node struct { srtAddress *url.URL } -func NewNode(address string) Node { +func NewNode(id, address string) Node { n := &node{ + id: id, address: address, state: stateDisconnected, secure: strings.HasPrefix(address, "https://"), } + n.resources.throttling = true + n.resources.cpu = 100 + n.resources.ncpu = 1 + n.resources.cpuLimit = 100 + n.resources.mem = 0 + n.resources.memLimit = 0 + + ctx, cancel := context.WithCancel(context.Background()) + n.disconnect = cancel + + err := n.connect(ctx) + if err != nil { + n.peerErr = err + + go func(ctx context.Context) { + ticker := time.NewTicker(3 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + err := n.connect(ctx) + if err == nil { + n.peerErr = nil + return + } else { + n.peerErr = err + } + case <-ctx.Done(): + return + } + } + }(ctx) + } + return n } -func (n *node) Connect() error { +func (n *node) SetAddress(address string) { + n.peerLock.Lock() + defer n.peerLock.Unlock() + + n.address = address +} + +func (n *node) connect(ctx context.Context) error { n.peerLock.Lock() defer n.peerLock.Unlock() @@ -156,6 +202,10 @@ func (n *node) Connect() error { return nil } + if len(n.address) == 0 { + return fmt.Errorf("no address provided") + } + u, err := url.Parse(n.address) if err != nil { return fmt.Errorf("invalid address: %w", err) @@ -195,7 +245,7 @@ func (n *node) Connect() error { return fmt.Errorf("failed to convert config to expected version") } - n.config = config + n.config = &config n.httpAddress = u @@ -237,129 +287,165 @@ func (n *node) Connect() error { n.peer = peer - ctx, cancel := context.WithCancel(context.Background()) - n.disconnect = cancel - n.peerWg.Add(2) - go func(ctx context.Context) { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - defer n.peerWg.Done() - - for { - select { - case <-ticker.C: - // Ping - latency, err := n.Ping() - - n.stateLock.Lock() - if err != nil { - n.state = stateDisconnected - } else { - n.lastContact = time.Now() - n.state = stateConnected - } - n.latency = n.latency*0.2 + latency.Seconds()*0.8 - n.stateLock.Unlock() - case <-ctx.Done(): - return - } - } - }(ctx) - - go func(ctx context.Context) { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - defer n.peerWg.Done() - - for { - select { - case <-ticker.C: - // Metrics - metrics, err := n.peer.Metrics(clientapi.MetricsQuery{ - Metrics: []clientapi.MetricsQueryMetric{ - {Name: "cpu_ncpu"}, - {Name: "cpu_idle"}, - {Name: "cpu_limit"}, - {Name: "cpu_throttling"}, - {Name: "mem_total"}, - {Name: "mem_free"}, - {Name: "mem_limit"}, - {Name: "mem_throttling"}, - }, - }) - - if err != nil { - n.stateLock.Lock() - n.resources.throttling = true - n.resources.cpu = 100 - n.resources.ncpu = 1 - n.resources.cpuLimit = 100 - n.resources.mem = 0 - n.resources.memLimit = 0 - n.state = stateDisconnected - n.stateLock.Unlock() - - continue - } - - cpu_ncpu := .0 - cpu_idle := .0 - cpu_limit := .0 - mem_total := uint64(0) - mem_free := uint64(0) - mem_limit := uint64(0) - throttling := .0 - - for _, x := range metrics.Metrics { - if x.Name == "cpu_idle" { - cpu_idle = x.Values[0].Value - } else if x.Name == "cpu_ncpu" { - cpu_ncpu = x.Values[0].Value - } else if x.Name == "cpu_limit" { - cpu_limit = x.Values[0].Value - } else if x.Name == "cpu_throttling" { - throttling += x.Values[0].Value - } else if x.Name == "mem_total" { - mem_total = uint64(x.Values[0].Value) - } else if x.Name == "mem_free" { - mem_free = uint64(x.Values[0].Value) - } else if x.Name == "mem_limit" { - mem_limit = uint64(x.Values[0].Value) - } else if x.Name == "mem_throttling" { - throttling += x.Values[0].Value - } - } - - n.stateLock.Lock() - if throttling > 0 { - n.resources.throttling = true - } else { - n.resources.throttling = false - } - n.resources.ncpu = cpu_ncpu - n.resources.cpu = (100 - cpu_idle) * cpu_ncpu - n.resources.cpuLimit = cpu_limit * cpu_ncpu - if mem_total != 0 { - n.resources.mem = mem_total - mem_free - n.resources.memLimit = mem_limit - } else { - n.resources.mem = 0 - n.resources.memLimit = 0 - } - n.lastContact = time.Now() - n.stateLock.Unlock() - case <-ctx.Done(): - return - } - } - }(ctx) + go n.pingPeer(ctx, &n.peerWg) + go n.updateResources(ctx, &n.peerWg) return nil } -func (n *node) Config() clientapi.ConfigV3 { +func (n *node) IsConnected() (bool, error) { + n.peerLock.RLock() + defer n.peerLock.RUnlock() + + if n.peer == nil { + fmt.Printf("\n***** n.peer is nil\n") + return false, fmt.Errorf("not connected: %w", n.peerErr) + } + + return true, nil +} + +func (n *node) Disconnect() { + n.peerLock.Lock() + if n.disconnect != nil { + n.disconnect() + n.disconnect = nil + } + n.peerLock.Unlock() + + n.peerWg.Wait() + + n.peerLock.Lock() + n.peer = nil + n.peerErr = fmt.Errorf("disconnected") + n.peerLock.Unlock() +} + +func (n *node) pingPeer(ctx context.Context, wg *sync.WaitGroup) { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + defer wg.Done() + + for { + select { + case <-ticker.C: + // Ping + latency, err := n.Ping() + + n.peerLock.Lock() + n.peerErr = err + n.peerLock.Unlock() + + n.stateLock.Lock() + if err != nil { + n.state = stateDisconnected + } else { + n.lastContact = time.Now() + n.state = stateConnected + } + n.latency = n.latency*0.2 + latency.Seconds()*0.8 + n.stateLock.Unlock() + case <-ctx.Done(): + return + } + } +} + +func (n *node) updateResources(ctx context.Context, wg *sync.WaitGroup) { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + defer wg.Done() + + for { + select { + case <-ticker.C: + // Metrics + metrics, err := n.peer.Metrics(clientapi.MetricsQuery{ + Metrics: []clientapi.MetricsQueryMetric{ + {Name: "cpu_ncpu"}, + {Name: "cpu_idle"}, + {Name: "cpu_limit"}, + {Name: "cpu_throttling"}, + {Name: "mem_total"}, + {Name: "mem_free"}, + {Name: "mem_limit"}, + {Name: "mem_throttling"}, + }, + }) + + n.peerLock.Lock() + n.peerErr = err + n.peerLock.Unlock() + + if err != nil { + n.stateLock.Lock() + n.resources.throttling = true + n.resources.cpu = 100 + n.resources.ncpu = 1 + n.resources.cpuLimit = 100 + n.resources.mem = 0 + n.resources.memLimit = 0 + n.state = stateDisconnected + n.stateLock.Unlock() + + continue + } + + cpu_ncpu := .0 + cpu_idle := .0 + cpu_limit := .0 + mem_total := uint64(0) + mem_free := uint64(0) + mem_limit := uint64(0) + throttling := .0 + + for _, x := range metrics.Metrics { + if x.Name == "cpu_idle" { + cpu_idle = x.Values[0].Value + } else if x.Name == "cpu_ncpu" { + cpu_ncpu = x.Values[0].Value + } else if x.Name == "cpu_limit" { + cpu_limit = x.Values[0].Value + } else if x.Name == "cpu_throttling" { + throttling += x.Values[0].Value + } else if x.Name == "mem_total" { + mem_total = uint64(x.Values[0].Value) + } else if x.Name == "mem_free" { + mem_free = uint64(x.Values[0].Value) + } else if x.Name == "mem_limit" { + mem_limit = uint64(x.Values[0].Value) + } else if x.Name == "mem_throttling" { + throttling += x.Values[0].Value + } + } + + n.stateLock.Lock() + if throttling > 0 { + n.resources.throttling = true + } else { + n.resources.throttling = false + } + n.resources.ncpu = cpu_ncpu + n.resources.cpu = (100 - cpu_idle) * cpu_ncpu + n.resources.cpuLimit = cpu_limit * cpu_ncpu + if mem_total != 0 { + n.resources.mem = mem_total - mem_free + n.resources.memLimit = mem_limit + } else { + n.resources.mem = 0 + n.resources.memLimit = 0 + } + n.lastContact = time.Now() + n.stateLock.Unlock() + case <-ctx.Done(): + return + } + } +} + +func (n *node) Config() *clientapi.ConfigV3 { return n.config } @@ -371,12 +457,7 @@ func (n *node) Ping() (time.Duration, error) { return 0, fmt.Errorf("not connected") } - ok, latency := n.peer.Ping() - var err error = nil - - if !ok { - err = fmt.Errorf("not connected") - } + latency, err := n.peer.Ping() return latency, err } @@ -400,22 +481,7 @@ func (n *node) AboutPeer() (clientapi.About, error) { return clientapi.About{}, fmt.Errorf("not connected") } - return n.peer.About(), nil -} - -func (n *node) Disconnect() { - n.peerLock.Lock() - if n.disconnect != nil { - n.disconnect() - n.disconnect = nil - } - n.peerLock.Unlock() - - n.peerWg.Wait() - - n.peerLock.Lock() - n.peer = nil - n.peerLock.Unlock() + return n.peer.About(false) } func (n *node) StartFiles(updates chan<- NodeFiles) error { @@ -471,7 +537,9 @@ func (n *node) About() NodeAbout { about, err := n.AboutPeer() if err != nil { return NodeAbout{ - State: stateDisconnected.String(), + ID: n.id, + Address: n.address, + State: stateDisconnected.String(), } } @@ -489,7 +557,7 @@ func (n *node) About() NodeAbout { } nodeAbout := NodeAbout{ - ID: about.ID, + ID: n.id, Name: about.Name, Address: n.address, State: state.String(), diff --git a/cluster/proxy/proxy.go b/cluster/proxy/proxy.go index 29ef4b67..8cbf04e7 100644 --- a/cluster/proxy/proxy.go +++ b/cluster/proxy/proxy.go @@ -249,9 +249,9 @@ func (p *proxy) Resources() map[string]NodeResources { func (p *proxy) AddNode(id string, node Node) (string, error) { about := node.About() - if id != about.ID { - return "", fmt.Errorf("the provided (%s) and retrieved (%s) ID's don't match", id, about.ID) - } + //if id != about.ID { + // return "", fmt.Errorf("the provided (%s) and retrieved (%s) ID's don't match", id, about.ID) + //} p.lock.Lock() defer p.lock.Unlock() diff --git a/config/config.go b/config/config.go index 2a9e5c74..a899276a 100644 --- a/config/config.go +++ b/config/config.go @@ -287,9 +287,9 @@ func (d *Config) init() { d.vars.Register(value.NewBool(&d.Cluster.Debug, false), "cluster.debug", "CORE_CLUSTER_DEBUG", nil, "Switch to debug mode, not for production", false, false) d.vars.Register(value.NewClusterAddress(&d.Cluster.Address, "127.0.0.1:8000"), "cluster.address", "CORE_CLUSTER_ADDRESS", nil, "Raft listen address", true, false) d.vars.Register(value.NewClusterPeerList(&d.Cluster.Peers, []string{""}, ","), "cluster.peers", "CORE_CLUSTER_PEERS", nil, "Raft addresses of cores that are part of the cluster", false, false) - d.vars.Register(value.NewInt64(&d.Cluster.SyncInterval, 5), "cluster.sync_interval_sec", "CORE_CLUSTER_SYNC_INTERVAL", nil, "Interval between aligning the process in the cluster DB with the processes on the nodes", true, false) - d.vars.Register(value.NewInt64(&d.Cluster.NodeRecoverTimeout, 120), "cluster.node_recover_timeout_sec", "CORE_CLUSTER_NODE_RECOVER_TIMEOUT", nil, "Timeout for a node to recover before rebalancing the processes", true, false) - d.vars.Register(value.NewInt64(&d.Cluster.EmergencyLeaderTimeout, 10), "cluster.emergency_leader_timeout_sec", "CORE_CLUSTER_EMERGENCY_LEADER_TIMEOUT", nil, "Timeout for establishing the emergency leadership after lost contact to raft leader", true, false) + d.vars.Register(value.NewInt64(&d.Cluster.SyncInterval, 5), "cluster.sync_interval_sec", "CORE_CLUSTER_SYNC_INTERVAL_SEC", nil, "Interval between aligning the process in the cluster DB with the processes on the nodes", true, false) + d.vars.Register(value.NewInt64(&d.Cluster.NodeRecoverTimeout, 120), "cluster.node_recover_timeout_sec", "CORE_CLUSTER_NODE_RECOVER_TIMEOUT_SEC", nil, "Timeout for a node to recover before rebalancing the processes", true, false) + d.vars.Register(value.NewInt64(&d.Cluster.EmergencyLeaderTimeout, 10), "cluster.emergency_leader_timeout_sec", "CORE_CLUSTER_EMERGENCY_LEADER_TIMEOUT_SEC", nil, "Timeout for establishing the emergency leadership after lost contact to raft leader", true, false) } // Validate validates the current state of the Config for completeness and sanity. Errors are diff --git a/go.mod b/go.mod index 387fd980..79753126 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/atrox/haikunatorgo/v2 v2.0.1 github.com/caddyserver/certmagic v0.17.2 github.com/casbin/casbin/v2 v2.69.1 - github.com/datarhei/core-client-go/v16 v16.11.1-0.20230605095314-42546fbbbece + github.com/datarhei/core-client-go/v16 v16.11.1-0.20230614141756-a25a5fc3c60e github.com/datarhei/gosrt v0.4.1 github.com/datarhei/joy4 v0.0.0-20230505074825-fde05957445a github.com/fujiwara/shapeio v1.0.0 diff --git a/go.sum b/go.sum index 91ad86ef..eb3b80be 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,8 @@ github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE= github.com/Masterminds/semver/v3 v3.2.1 h1:RN9w6+7QoMeJVGyfmbcgs28Br8cvmnucEXnY0rYXWg0= github.com/Masterminds/semver/v3 v3.2.1/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ= +github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= +github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/adhocore/gronx v1.1.2 h1:Hgm+d8SyGtn+rCoDkxZq3nLNFLLkzRGR7L2ziRRD1w8= github.com/adhocore/gronx v1.1.2/go.mod h1:7oUY1WAU8rEJWmAxXR2DN0JaO4gi9khSgKjiRypqteg= github.com/agnivade/levenshtein v1.0.1/go.mod h1:CURSv5d9Uaml+FovSIICkLbAUZ9S4RqaHDIsdSBg7lM= @@ -27,6 +29,7 @@ github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+ github.com/atrox/haikunatorgo/v2 v2.0.1 h1:FCVx2KL2YvZtI1rI9WeEHxeLRrKGr0Dd4wfCJiUXupc= github.com/atrox/haikunatorgo/v2 v2.0.1/go.mod h1:BBQmx2o+1Z5poziaHRgddAZKOpijwfKdAmMnSYlFK70= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= +github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c h1:8XZeJrs4+ZYhJeJ2aZxADI2tGADS15AzIF8MQ8XAhT4= github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c/go.mod h1:x1vxHcL/9AVzuk5HOloOEPrtJY0MaalYr78afXZ+pWI= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= @@ -49,6 +52,10 @@ github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46t github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/datarhei/core-client-go/v16 v16.11.1-0.20230605095314-42546fbbbece h1:Gv+W986jLcMa/TOKg5YF3RMDlNDDyj7uHuH+mHP7xq8= github.com/datarhei/core-client-go/v16 v16.11.1-0.20230605095314-42546fbbbece/go.mod h1:6L0zr/NUwvaPsCTK/IL17m8JUEtgLp3BDtlsBREwacg= +github.com/datarhei/core-client-go/v16 v16.11.1-0.20230614130211-fb0f92af8ac9 h1:ntM1tymajXx92ydwi6RSiDG54aQV3cMOtlGRBT6p9Z8= +github.com/datarhei/core-client-go/v16 v16.11.1-0.20230614130211-fb0f92af8ac9/go.mod h1:6L0zr/NUwvaPsCTK/IL17m8JUEtgLp3BDtlsBREwacg= +github.com/datarhei/core-client-go/v16 v16.11.1-0.20230614141756-a25a5fc3c60e h1:iQKqGTyIdCyO7kY/G5MCKhzt3xZ5YPRubbJskVp5EvQ= +github.com/datarhei/core-client-go/v16 v16.11.1-0.20230614141756-a25a5fc3c60e/go.mod h1:6L0zr/NUwvaPsCTK/IL17m8JUEtgLp3BDtlsBREwacg= github.com/datarhei/gosrt v0.4.1 h1:08km3wKy72jOdC+JzBDWN57H7xST4mz5lFeJQHuWmMs= github.com/datarhei/gosrt v0.4.1/go.mod h1:FtsulRiUc67Oi3Ii9JH9aQkpO+ZfgeauRAtIE40mIVA= github.com/datarhei/joy4 v0.0.0-20230505074825-fde05957445a h1:Tf4DSHY1xruBglr+yYP5Wct7czM86GKMYgbXH8a7OFo= @@ -72,6 +79,7 @@ github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2 github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= +github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= @@ -154,11 +162,13 @@ github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= +github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI= github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= @@ -228,6 +238,7 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= @@ -280,6 +291,7 @@ github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFt github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= github.com/shoenig/test v0.6.3/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU= +github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.9.2 h1:oxx1eChJGI6Uks2ZC4W1zpLlVgqB8ner4EuQwV4Ik1Y= diff --git a/http/api/cluster.go b/http/api/cluster.go index 56253c23..86025d74 100644 --- a/http/api/cluster.go +++ b/http/api/cluster.go @@ -17,7 +17,6 @@ type ClusterNode struct { Latency float64 `json:"latency_ms"` // milliseconds State string `json:"state"` Resources ClusterNodeResources `json:"resources"` - Version string `json:"version"` } func (n *ClusterNode) Marshal(about proxy.NodeAbout) { @@ -73,6 +72,7 @@ type ClusterAbout struct { Nodes []ClusterNode `json:"nodes"` Version string `json:"version"` Degraded bool `json:"degraded"` + DegradedErr string `json:"degraded_error"` } type ClusterProcess struct { diff --git a/http/handler/api/cluster.go b/http/handler/api/cluster.go index 62d793b3..1e1c05ff 100644 --- a/http/handler/api/cluster.go +++ b/http/handler/api/cluster.go @@ -77,6 +77,10 @@ func (h *ClusterHandler) About(c echo.Context) error { Degraded: state.Degraded, } + if state.DegradedErr != nil { + about.DegradedErr = state.DegradedErr.Error() + } + for _, n := range state.Raft.Server { about.Raft.Server = append(about.Raft.Server, api.ClusterRaftServer{ ID: n.ID, diff --git a/vendor/github.com/datarhei/core-client-go/v16/api/config.go b/vendor/github.com/datarhei/core-client-go/v16/api/config.go index aa3e186c..9f98ea3e 100644 --- a/vendor/github.com/datarhei/core-client-go/v16/api/config.go +++ b/vendor/github.com/datarhei/core-client-go/v16/api/config.go @@ -407,8 +407,9 @@ type ConfigV3 struct { } `json:"output"` } `json:"access"` Log struct { - MaxLines int `json:"max_lines" format:"int"` - MaxHistory int `json:"max_history" format:"int"` + MaxLines int `json:"max_lines" format:"int"` + MaxHistory int `json:"max_history" format:"int"` + MaxMinimalHistory int `json:"max_minimal_history" format:"int"` } `json:"log"` } `json:"ffmpeg"` Playout struct { @@ -417,9 +418,10 @@ type ConfigV3 struct { MaxPort int `json:"max_port" format:"int"` } `json:"playout"` Debug struct { - Profiling bool `json:"profiling"` - ForceGC int `json:"force_gc" format:"int"` - MemoryLimit int64 `json:"memory_limit_mbytes" format:"int64"` + Profiling bool `json:"profiling"` + ForceGC int `json:"force_gc" format:"int"` // deprecated, use MemoryLimit instead + MemoryLimit int64 `json:"memory_limit_mbytes" format:"int64"` + AutoMaxProcs bool `json:"auto_max_procs"` } `json:"debug"` Metrics struct { Enable bool `json:"enable"` @@ -447,16 +449,17 @@ type ConfigV3 struct { UIPath string `json:"ui_path"` } `json:"router"` Resources struct { - MaxCPUUsage float64 `json:"max_cpu_usage"` - MaxMemoryUsage float64 `json:"max_memory_usage"` + MaxCPUUsage float64 `json:"max_cpu_usage"` // percent 0-100 + MaxMemoryUsage float64 `json:"max_memory_usage"` // percent 0-100 } `json:"resources"` Cluster struct { - Enable bool `json:"enable"` - Bootstrap bool `json:"bootstrap"` - Recover bool `json:"recover"` - Debug bool `json:"debug"` - Address string `json:"address"` - Peers []string `json:"peers"` + Enable bool `json:"enable"` + Debug bool `json:"debug"` + Address string `json:"address"` // ip:port + Peers []string `json:"peers"` + SyncInterval int64 `json:"sync_interval_sec" format:"int64"` // seconds + NodeRecoverTimeout int64 `json:"node_recover_timeout_sec" format:"int64"` // seconds + EmergencyLeaderTimeout int64 `json:"emergency_leader_timeout_sec" format:"int64"` // seconds } `json:"cluster"` } diff --git a/vendor/github.com/datarhei/core-client-go/v16/client.go b/vendor/github.com/datarhei/core-client-go/v16/client.go index 311c7cf2..f7f49811 100644 --- a/vendor/github.com/datarhei/core-client-go/v16/client.go +++ b/vendor/github.com/datarhei/core-client-go/v16/client.go @@ -8,6 +8,7 @@ import ( "net/http" "net/url" "strings" + "sync" "time" "github.com/datarhei/core-client-go/v16/api" @@ -38,13 +39,13 @@ type RestClient interface { // Address returns the address of the connected datarhei Core Address() string - Ping() (bool, time.Duration) + Ping() (time.Duration, error) - About() api.About // GET / + About(cached bool) (api.About, error) // GET / - Config() (int64, api.Config, error) // GET /config - ConfigSet(config interface{}) error // POST /config - ConfigReload() error // GET /config/reload + Config() (int64, api.Config, error) // GET /v3/config + ConfigSet(config interface{}) error // POST /v3/config + ConfigReload() error // GET /v3/config/reload Graph(query api.GraphQuery) (api.GraphResponse, error) // POST /graph @@ -66,7 +67,7 @@ type RestClient interface { FilesystemDeleteFile(name, path string) error // DELETE /v3/fs/{name}/{path} FilesystemAddFile(name, path string, data io.Reader) error // PUT /v3/fs/{name}/{path} - Log() ([]api.LogEvent, error) // GET /log + Log() ([]api.LogEvent, error) // GET /v3/log Metadata(key string) (api.Metadata, error) // GET /v3/metadata/{key} MetadataSet(key string, metadata api.Metadata) error // PUT /v3/metadata/{key} @@ -130,6 +131,7 @@ type restclient struct { auth0Token string client HTTPClient about api.About + aboutLock sync.RWMutex version struct { connectedCore *semver.Version @@ -152,42 +154,29 @@ func New(config Config) (RestClient, error) { } u, err := url.Parse(r.address) - if err != nil { - return nil, err + if err == nil { + username := u.User.Username() + if len(username) != 0 { + r.username = username + } + + if password, ok := u.User.Password(); ok { + r.password = password + } + + u.User = nil + u.RawQuery = "" + u.Fragment = "" + + r.address = u.String() } - username := u.User.Username() - if len(username) != 0 { - r.username = username - } - - if password, ok := u.User.Password(); ok { - r.password = password - } - - u.User = nil - u.RawQuery = "" - u.Fragment = "" - - r.address = u.String() - if r.client == nil { r.client = &http.Client{ Timeout: 15 * time.Second, } } - about, err := r.info() - if err != nil { - return nil, err - } - - r.about = about - - if r.about.App != coreapp { - return nil, fmt.Errorf("didn't receive the expected API response (got: %s, want: %s)", r.about.Name, coreapp) - } - mustNewConstraint := func(constraint string) *semver.Constraints { v, _ := semver.NewConstraint(constraint) return v @@ -198,20 +187,35 @@ func New(config Config) (RestClient, error) { "GET/api/v3/metrics": mustNewConstraint("^16.10.0"), } - if len(r.about.ID) != 0 { + about, err := r.info() + if err != nil { + return nil, err + } + + if about.App != coreapp { + return nil, fmt.Errorf("didn't receive the expected API response (got: %s, want: %s)", about.Name, coreapp) + } + + r.aboutLock.Lock() + r.about = about + r.aboutLock.Unlock() + + if len(about.ID) != 0 { c, _ := semver.NewConstraint(coreversion) - v, err := semver.NewVersion(r.about.Version.Number) + v, err := semver.NewVersion(about.Version.Number) if err != nil { return nil, err } if !c.Check(v) { - return nil, fmt.Errorf("the core version (%s) is not supported, because a version %s is required", r.about.Version.Number, coreversion) + return nil, fmt.Errorf("the core version (%s) is not supported, because a version %s is required", about.Version.Number, coreversion) } + r.aboutLock.Lock() r.version.connectedCore = v + r.aboutLock.Unlock() } else { - v, err := semver.NewVersion(r.about.Version.Number) + v, err := semver.NewVersion(about.Version.Number) if err != nil { return nil, err } @@ -228,11 +232,17 @@ func New(config Config) (RestClient, error) { return r, nil } -func (r restclient) String() string { +func (r *restclient) String() string { + r.aboutLock.RLock() + defer r.aboutLock.RUnlock() + return fmt.Sprintf("%s %s (%s) %s @ %s", r.about.Name, r.about.Version.Number, r.about.Version.Arch, r.about.ID, r.address) } func (r *restclient) ID() string { + r.aboutLock.RLock() + defer r.aboutLock.RUnlock() + return r.about.ID } @@ -244,30 +254,43 @@ func (r *restclient) Address() string { return r.address } -func (r *restclient) About() api.About { - return r.about +func (r *restclient) About(cached bool) (api.About, error) { + if cached { + return r.about, nil + } + + about, err := r.info() + if err != nil { + return api.About{}, err + } + + r.about = about + + return about, nil } -func (r *restclient) Ping() (bool, time.Duration) { +func (r *restclient) Ping() (time.Duration, error) { req, err := http.NewRequest(http.MethodGet, r.address+"/ping", nil) if err != nil { - return false, time.Duration(0) + return time.Duration(0), err } start := time.Now() status, body, err := r.request(req) if err != nil { - return false, time.Since(start) + return time.Duration(0), err } defer body.Close() if status != 200 { - return false, time.Since(start) + return time.Duration(0), err } - return true, time.Since(start) + io.ReadAll(body) + + return time.Since(start), nil } func (r *restclient) login() error { @@ -363,8 +386,10 @@ func (r *restclient) login() error { return fmt.Errorf("the core version (%s) is not supported, because a version %s is required", about.Version.Number, coreversion) } + r.aboutLock.Lock() r.version.connectedCore = v r.about = about + r.aboutLock.Unlock() return nil } @@ -375,6 +400,9 @@ func (r *restclient) checkVersion(method, path string) error { return nil } + r.aboutLock.RLock() + defer r.aboutLock.RUnlock() + if !c.Check(r.version.connectedCore) { return fmt.Errorf("this method is only available in version %s of the core", c.String()) } @@ -383,6 +411,10 @@ func (r *restclient) checkVersion(method, path string) error { } func (r *restclient) refresh() error { + if len(r.refreshToken) == 0 { + return fmt.Errorf("no refresh token defined") + } + req, err := http.NewRequest("GET", r.address+r.prefix+"/login/refresh", nil) if err != nil { return err diff --git a/vendor/modules.txt b/vendor/modules.txt index ed37f307..a6a82123 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -78,7 +78,7 @@ github.com/cespare/xxhash/v2 # github.com/cpuguy83/go-md2man/v2 v2.0.2 ## explicit; go 1.11 github.com/cpuguy83/go-md2man/v2/md2man -# github.com/datarhei/core-client-go/v16 v16.11.1-0.20230605095314-42546fbbbece +# github.com/datarhei/core-client-go/v16 v16.11.1-0.20230614141756-a25a5fc3c60e ## explicit; go 1.18 github.com/datarhei/core-client-go/v16 github.com/datarhei/core-client-go/v16/api