From 6f47f96f6e97ab5e75daeb5ddb137bc8eec1e545 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Mon, 26 Jun 2023 10:35:00 +0200 Subject: [PATCH] Get peer config via cluster API --- cluster/cluster.go | 10 ++++- cluster/node.go | 100 +++++++----------------------------------- cluster/proxy/node.go | 58 +++++++++++------------- 3 files changed, 52 insertions(+), 116 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index f2a31eb9..548345d0 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -721,7 +721,7 @@ func (c *cluster) checkClusterNodes() error { return fmt.Errorf("node %s has a different version: %s: %w", id, version, err) } - config, err := node.Config() + config, err := node.CoreConfig() if err != nil { return fmt.Errorf("node %s has no configuration available: %w", id, err) } @@ -783,6 +783,14 @@ func verifyClusterConfig(local, remote *config.Config) error { return fmt.Errorf("srt.passphrase is different") } + if local.Resources.MaxCPUUsage == 0 || remote.Resources.MaxCPUUsage == 0 { + return fmt.Errorf("resources.max_cpu_usage must be defined") + } + + if local.Resources.MaxMemoryUsage == 0 || remote.Resources.MaxMemoryUsage == 0 { + return fmt.Errorf("resources.max_memory_usage must be defined") + } + return nil } diff --git a/cluster/node.go b/cluster/node.go index 2a79e393..8a82df6d 100644 --- a/cluster/node.go +++ b/cluster/node.go @@ -10,7 +10,6 @@ import ( "github.com/datarhei/core/v16/cluster/client" "github.com/datarhei/core/v16/cluster/proxy" "github.com/datarhei/core/v16/config" - "github.com/datarhei/core/v16/config/copy" ) type clusterNode struct { @@ -45,13 +44,10 @@ func NewClusterNode(id, address string) *clusterNode { }, } - version, err := n.client.Version() - if err != nil { - version = "0.0.0" + if version, err := n.client.Version(); err == nil { + n.version = version } - n.version = version - n.start(id) return n @@ -74,8 +70,8 @@ func (n *clusterNode) start(id string) error { n.lastCoreContactErr = fmt.Errorf("not started yet") n.lastContactErr = fmt.Errorf("not started yet") - address, err := n.CoreAPIAddress() - n.proxyNode = proxy.NewNode(id, address) + address, config, err := n.CoreEssentials() + n.proxyNode = proxy.NewNode(id, address, config) if err != nil { n.lastCoreContactErr = err @@ -86,10 +82,10 @@ func (n *clusterNode) start(id string) error { for { select { case <-ticker.C: - address, err := n.CoreAPIAddress() + address, config, err := n.CoreEssentials() n.pingLock.Lock() if err == nil { - n.proxyNode.SetAddress(address) + n.proxyNode.SetEssentials(address, config) n.lastCoreContactErr = nil } else { n.lastCoreContactErr = err @@ -156,84 +152,22 @@ func (n *clusterNode) LastContact() time.Time { return n.lastContact } -func (n *clusterNode) Config() (*config.Config, error) { - if n.proxyNode == nil { - return nil, fmt.Errorf("proxy not available") +func (n *clusterNode) CoreEssentials() (string, *config.Config, error) { + address, err := n.CoreAPIAddress() + if err != nil { + return "", nil, err } - apiconfig := n.proxyNode.Config() - if apiconfig == nil { - return nil, fmt.Errorf("no config stored") + config, err := n.CoreConfig() + if err != nil { + return "", nil, err } - config := config.New(nil) + return address, config, nil +} - config.Version = apiconfig.Version - config.Version = apiconfig.Version - config.Version = apiconfig.Version - config.ID = apiconfig.ID - config.Name = apiconfig.Name - config.Address = apiconfig.Address - config.CheckForUpdates = apiconfig.CheckForUpdates - - config.Log = apiconfig.Log - config.DB = apiconfig.DB - config.Host = apiconfig.Host - config.API.ReadOnly = apiconfig.API.ReadOnly - config.API.Access = apiconfig.API.Access - config.API.Auth.Enable = apiconfig.API.Auth.Enable - config.API.Auth.DisableLocalhost = apiconfig.API.Auth.DisableLocalhost - config.API.Auth.Username = apiconfig.API.Auth.Username - config.API.Auth.Password = apiconfig.API.Auth.Password - config.API.Auth.JWT = apiconfig.API.Auth.JWT - config.TLS = apiconfig.TLS - config.Storage.MimeTypes = apiconfig.Storage.MimeTypes - config.Storage.Disk = apiconfig.Storage.Disk - config.Storage.Memory = apiconfig.Storage.Memory - config.Storage.CORS = apiconfig.Storage.CORS - config.RTMP = apiconfig.RTMP - config.SRT = apiconfig.SRT - config.FFmpeg = apiconfig.FFmpeg - config.Playout = apiconfig.Playout - config.Debug = apiconfig.Debug - config.Metrics = apiconfig.Metrics - config.Sessions = apiconfig.Sessions - config.Service = apiconfig.Service - config.Router = apiconfig.Router - config.Resources = apiconfig.Resources - config.Cluster = apiconfig.Cluster - - config.Log.Topics = copy.Slice(apiconfig.Log.Topics) - - config.Host.Name = copy.Slice(apiconfig.Host.Name) - - config.API.Access.HTTP.Allow = copy.Slice(apiconfig.API.Access.HTTP.Allow) - config.API.Access.HTTP.Block = copy.Slice(apiconfig.API.Access.HTTP.Block) - config.API.Access.HTTPS.Allow = copy.Slice(apiconfig.API.Access.HTTPS.Allow) - config.API.Access.HTTPS.Block = copy.Slice(apiconfig.API.Access.HTTPS.Block) - - //config.API.Auth.Auth0.Tenants = copy.TenantSlice(d.API.Auth.Auth0.Tenants) - - config.Storage.CORS.Origins = copy.Slice(apiconfig.Storage.CORS.Origins) - config.Storage.Disk.Cache.Types.Allow = copy.Slice(apiconfig.Storage.Disk.Cache.Types.Allow) - config.Storage.Disk.Cache.Types.Block = copy.Slice(apiconfig.Storage.Disk.Cache.Types.Block) - //config.Storage.S3 = copy.Slice(d.Storage.S3) - - config.FFmpeg.Access.Input.Allow = copy.Slice(apiconfig.FFmpeg.Access.Input.Allow) - config.FFmpeg.Access.Input.Block = copy.Slice(apiconfig.FFmpeg.Access.Input.Block) - config.FFmpeg.Access.Output.Allow = copy.Slice(apiconfig.FFmpeg.Access.Output.Allow) - config.FFmpeg.Access.Output.Block = copy.Slice(apiconfig.FFmpeg.Access.Output.Block) - - config.Sessions.IPIgnoreList = copy.Slice(apiconfig.Sessions.IPIgnoreList) - - config.SRT.Log.Topics = copy.Slice(apiconfig.SRT.Log.Topics) - - config.Router.BlockedPrefixes = copy.Slice(apiconfig.Router.BlockedPrefixes) - config.Router.Routes = copy.StringMap(apiconfig.Router.Routes) - - config.Cluster.Peers = copy.Slice(apiconfig.Cluster.Peers) - - return config, nil +func (n *clusterNode) CoreConfig() (*config.Config, error) { + return n.client.CoreConfig() } func (n *clusterNode) CoreAPIAddress() (string, error) { diff --git a/cluster/proxy/node.go b/cluster/proxy/node.go index 93c544d9..51f7e97f 100644 --- a/cluster/proxy/node.go +++ b/cluster/proxy/node.go @@ -12,6 +12,7 @@ import ( "sync" "time" + "github.com/datarhei/core/v16/config" "github.com/datarhei/core/v16/restream/app" client "github.com/datarhei/core-client-go/v16" @@ -19,11 +20,11 @@ import ( ) type Node interface { - SetAddress(address string) + SetEssentials(address string, config *config.Config) IsConnected() (bool, error) Disconnect() - Config() *clientapi.ConfigV3 + Config() *config.Config StartFiles(updates chan<- NodeFiles) error StopFiles() @@ -124,7 +125,7 @@ type node struct { memLimit uint64 } - config *clientapi.ConfigV3 + config *config.Config state nodeState latency float64 // Seconds @@ -145,10 +146,11 @@ type node struct { srtAddress *url.URL } -func NewNode(id, address string) Node { +func NewNode(id, address string, config *config.Config) Node { n := &node{ id: id, address: address, + config: config, state: stateDisconnected, secure: strings.HasPrefix(address, "https://"), } @@ -191,11 +193,12 @@ func NewNode(id, address string) Node { return n } -func (n *node) SetAddress(address string) { +func (n *node) SetEssentials(address string, config *config.Config) { n.peerLock.Lock() defer n.peerLock.Unlock() n.address = address + n.config = config } func (n *node) connect(ctx context.Context) error { @@ -210,6 +213,10 @@ func (n *node) connect(ctx context.Context) error { return fmt.Errorf("no address provided") } + if n.config == nil { + return fmt.Errorf("config not available") + } + u, err := url.Parse(n.address) if err != nil { return fmt.Errorf("invalid address (%s): %w", n.address, err) @@ -235,34 +242,18 @@ func (n *node) connect(ctx context.Context) error { return fmt.Errorf("creating client failed (%s): %w", n.address, err) } - version, cfg, err := peer.Config() - if err != nil { - return err - } - - if version != 3 { - return fmt.Errorf("unsupported core config version: %d", version) - } - - config, ok := cfg.Config.(clientapi.ConfigV3) - if !ok { - return fmt.Errorf("failed to convert config to expected version") - } - - n.config = &config - n.httpAddress = u - if config.RTMP.Enable { + if n.config.RTMP.Enable { n.hasRTMP = true n.rtmpAddress = &url.URL{} n.rtmpAddress.Scheme = "rtmp" isHostIP := net.ParseIP(nodehost) != nil - address := config.RTMP.Address - if n.secure && config.RTMP.EnableTLS && !isHostIP { - address = config.RTMP.AddressTLS + address := n.config.RTMP.Address + if n.secure && n.config.RTMP.EnableTLS && !isHostIP { + address = n.config.RTMP.AddressTLS n.rtmpAddress.Scheme = "rtmps" } @@ -277,17 +268,17 @@ func (n *node) connect(ctx context.Context) error { n.rtmpAddress.Host = net.JoinHostPort(host, port) } - n.rtmpAddress = n.rtmpAddress.JoinPath(config.RTMP.App) + n.rtmpAddress = n.rtmpAddress.JoinPath(n.config.RTMP.App) } - if config.SRT.Enable { + if n.config.SRT.Enable { n.hasSRT = true n.srtAddress = &url.URL{} n.srtAddress.Scheme = "srt" - host, port, err := net.SplitHostPort(config.SRT.Address) + host, port, err := net.SplitHostPort(n.config.SRT.Address) if err != nil { - return fmt.Errorf("invalid srt address '%s': %w", config.SRT.Address, err) + return fmt.Errorf("invalid srt address '%s': %w", n.config.SRT.Address, err) } if len(host) == 0 { @@ -299,8 +290,8 @@ func (n *node) connect(ctx context.Context) error { v := url.Values{} v.Set("mode", "caller") - if len(config.SRT.Passphrase) != 0 { - v.Set("passphrase", config.SRT.Passphrase) + if len(n.config.SRT.Passphrase) != 0 { + v.Set("passphrase", n.config.SRT.Passphrase) } n.srtAddress.RawQuery = v.Encode() @@ -467,7 +458,10 @@ func (n *node) updateResources(ctx context.Context, wg *sync.WaitGroup) { } } -func (n *node) Config() *clientapi.ConfigV3 { +func (n *node) Config() *config.Config { + n.peerLock.RLock() + defer n.peerLock.RUnlock() + return n.config }