From 022c5c1a6dd78f74efc70f9081650ad51008d82f Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Mon, 11 Sep 2023 14:42:46 +0200 Subject: [PATCH] Emit warnings --- cluster/cluster.go | 9 ++++++++- cluster/node/node.go | 39 ++++++++++++++++++++++++++++++++------- cluster/proxy/node.go | 30 +++++++++++++++++++++++++----- resources/resources.go | 4 ++-- 4 files changed, 67 insertions(+), 15 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index 8913e491..71b39fa6 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -960,7 +960,14 @@ func (c *cluster) trackNodeChanges() { logger.Warn().WithError(err).Log("Discovering cluster API address") } - node := clusternode.New(id, address) + node := clusternode.New(clusternode.Config{ + ID: id, + Address: address, + Logger: c.logger.WithComponent("ClusterNode").WithFields(log.Fields{ + "id": id, + "address": address, + }), + }) if err := verifyClusterVersion(node.Version()); err != nil { logger.Warn().Log("Version mismatch. Cluster will end up in degraded mode") diff --git a/cluster/node/node.go b/cluster/node/node.go index e650292b..dca02a2f 100644 --- a/cluster/node/node.go +++ b/cluster/node/node.go @@ -12,6 +12,7 @@ import ( "github.com/datarhei/core/v16/cluster/proxy" "github.com/datarhei/core/v16/config" "github.com/datarhei/core/v16/ffmpeg/skills" + "github.com/datarhei/core/v16/log" ) type Node interface { @@ -50,22 +51,36 @@ type node struct { cancelPing context.CancelFunc proxyNode proxy.Node + + logger log.Logger } -func New(id, address string) Node { +type Config struct { + ID string + Address string + + Logger log.Logger +} + +func New(config Config) Node { n := &node{ - id: id, - address: address, + id: config.ID, + address: config.Address, version: "0.0.0", client: client.APIClient{ - Address: address, + Address: config.Address, Client: &http.Client{ Timeout: 5 * time.Second, }, }, + logger: config.Logger, } - if host, _, err := net.SplitHostPort(address); err == nil { + if n.logger == nil { + n.logger = log.New("") + } + + if host, _, err := net.SplitHostPort(n.address); err == nil { if addrs, err := net.LookupHost(host); err == nil { n.ips = addrs } @@ -75,7 +90,7 @@ func New(id, address string) Node { n.version = version } - n.start(id) + n.start(n.id) return n } @@ -95,7 +110,12 @@ func (n *node) start(id string) error { n.lastContactErr = fmt.Errorf("not started yet") address, config, err := n.CoreEssentials() - n.proxyNode = proxy.NewNode(id, address, config) + n.proxyNode = proxy.NewNode(proxy.NodeConfig{ + ID: id, + Address: address, + Config: config, + Logger: n.logger.WithComponent("ClusterProxyNode").WithField("address", address), + }) n.lastCoreContactErr = err @@ -114,6 +134,7 @@ func (n *node) start(id string) error { n.lastCoreContactErr = nil } else { n.lastCoreContactErr = err + n.logger.Error().WithError(err).Log("Failed to retrieve core essentials") } n.pingLock.Unlock() case <-ctx.Done(): @@ -313,6 +334,8 @@ func (n *node) ping(ctx context.Context) { n.pingLock.Lock() n.lastContactErr = err n.pingLock.Unlock() + + n.logger.Warn().WithError(err).Log("Failed to ping cluster API") } case <-ctx.Done(): return @@ -338,6 +361,8 @@ func (n *node) pingCore(ctx context.Context) { n.pingLock.Lock() n.lastCoreContactErr = fmt.Errorf("not connected to core api: %w", err) n.pingLock.Unlock() + + n.logger.Warn().WithError(err).Log("not connected to core API") } case <-ctx.Done(): return diff --git a/cluster/proxy/node.go b/cluster/proxy/node.go index 50021970..4b06e135 100644 --- a/cluster/proxy/node.go +++ b/cluster/proxy/node.go @@ -13,6 +13,7 @@ import ( "time" "github.com/datarhei/core/v16/config" + "github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/restream/app" client "github.com/datarhei/core-client-go/v16" @@ -141,15 +142,30 @@ type node struct { rtmpAddress *url.URL hasSRT bool srtAddress *url.URL + + logger log.Logger } -func NewNode(id, address string, config *config.Config) Node { +type NodeConfig struct { + ID string + Address string + Config *config.Config + + Logger log.Logger +} + +func NewNode(config NodeConfig) Node { n := &node{ - id: id, - address: address, - config: config, + id: config.ID, + address: config.Address, + config: config.Config, state: stateDisconnected, - secure: strings.HasPrefix(address, "https://"), + secure: strings.HasPrefix(config.Address, "https://"), + logger: config.Logger, + } + + if n.logger == nil { + n.logger = log.New("") } n.resources.throttling = true @@ -362,6 +378,8 @@ func (n *node) pingPeer(ctx context.Context, wg *sync.WaitGroup) { n.stateLock.Lock() if err != nil { n.state = stateDisconnected + + n.logger.Warn().WithError(err).Log("Failed to retrieve about") } else { n.lastContact = time.Now() n.state = stateConnected @@ -407,6 +425,8 @@ func (n *node) updateResources(ctx context.Context, wg *sync.WaitGroup) { n.resources.err = err n.stateLock.Unlock() + n.logger.Warn().WithError(err).Log("Failed to retrieve metrics") + continue } diff --git a/resources/resources.go b/resources/resources.go index e6a371cc..6986cdb2 100644 --- a/resources/resources.go +++ b/resources/resources.go @@ -204,7 +204,7 @@ func (r *resources) observe(ctx context.Context, interval time.Duration) { r.lock.Lock() if r.isCPULimiting != doCPULimit { - r.logger.Debug().WithFields(log.Fields{ + r.logger.Warn().WithFields(log.Fields{ "enabled": doCPULimit, }).Log("Limiting CPU") @@ -212,7 +212,7 @@ func (r *resources) observe(ctx context.Context, interval time.Duration) { } if r.isMemoryLimiting != doMemoryLimit { - r.logger.Debug().WithFields(log.Fields{ + r.logger.Warn().WithFields(log.Fields{ "enabled": doMemoryLimit, }).Log("Limiting memory")