From b5976f37f096a377161199bacf4f08dc68dc307f Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Mon, 10 Jul 2023 16:24:08 +0200 Subject: [PATCH] Start cluster after core API has been started The core API will be started before the cluster is started in order to access the cluster endpoints during a cluster upgrade. If TLS is enabled, possibly stale certificates are loaded into the cache. Otherwise the leader has to be contacted via the cluster API which might have changed. --- app/api/api.go | 89 ++++++++++++++++++--------------- autocert/autocert.go | 7 +++ cluster/api.go | 2 +- cluster/cluster.go | 114 ++++++++++++++++++++++++------------------- cluster/kvs.go | 40 ++++++++++----- cluster/version.go | 2 +- 6 files changed, 150 insertions(+), 104 deletions(-) diff --git a/app/api/api.go b/app/api/api.go index f55f0368..68358013 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -333,6 +333,43 @@ func (a *api) start(ctx context.Context) error { } } + if cfg.Debug.ForceGC > 0 { + ctx, cancel := context.WithCancel(ctx) + a.gcTickerStop = cancel + + go func(ctx context.Context) { + ticker := time.NewTicker(time.Duration(cfg.Debug.ForceGC) * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + debug.FreeOSMemory() + /* + var mem runtime.MemStats + runtime.ReadMemStats(&mem) + a.log.logger.main.WithComponent("memory").Debug().WithFields(log.Fields{ + "Sys": float64(mem.Sys) / (1 << 20), + "HeapSys": float64(mem.HeapSys) / (1 << 20), + "HeapAlloc": float64(mem.HeapAlloc) / (1 << 20), + "HeapIdle": float64(mem.HeapIdle) / (1 << 20), + "HeapInuse": float64(mem.HeapInuse) / (1 << 20), + "HeapReleased": float64(mem.HeapReleased) / (1 << 20), + }).Log("") + */ + } + } + }(ctx) + } + + if cfg.Debug.MemoryLimit > 0 { + debug.SetMemoryLimit(cfg.Debug.MemoryLimit * 1024 * 1024) + } else { + debug.SetMemoryLimit(math.MaxInt64) + } + resources, err := resources.New(resources.Config{ MaxCPU: cfg.Resources.MaxCPUUsage, MaxMemory: cfg.Resources.MaxMemoryUsage, @@ -492,10 +529,7 @@ func (a *api) start(ctx context.Context) error { }) } - ctx, cancel := context.WithTimeout(ctx, time.Duration(cfg.Cluster.StartupTimeout)*time.Second) - defer cancel() - - cluster, err := cluster.New(ctx, cluster.Config{ + cluster, err := cluster.New(cluster.Config{ ID: cfg.ID, Name: cfg.Name, Path: filepath.Join(cfg.DB.Dir, "cluster"), @@ -1656,45 +1690,20 @@ func (a *api) start(ctx context.Context) error { // Wait for all servers to be started wgStart.Wait() - if cfg.Debug.ForceGC > 0 { - ctx, cancel := context.WithCancel(ctx) - a.gcTickerStop = cancel - - go func(ctx context.Context) { - ticker := time.NewTicker(time.Duration(cfg.Debug.ForceGC) * time.Second) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - debug.FreeOSMemory() - /* - var mem runtime.MemStats - runtime.ReadMemStats(&mem) - a.log.logger.main.WithComponent("memory").Debug().WithFields(log.Fields{ - "Sys": float64(mem.Sys) / (1 << 20), - "HeapSys": float64(mem.HeapSys) / (1 << 20), - "HeapAlloc": float64(mem.HeapAlloc) / (1 << 20), - "HeapIdle": float64(mem.HeapIdle) / (1 << 20), - "HeapInuse": float64(mem.HeapInuse) / (1 << 20), - "HeapReleased": float64(mem.HeapReleased) / (1 << 20), - }).Log("") - */ - } - } - }(ctx) - } - - if cfg.Debug.MemoryLimit > 0 { - debug.SetMemoryLimit(cfg.Debug.MemoryLimit * 1024 * 1024) - } else { - debug.SetMemoryLimit(math.MaxInt64) - } - // Start the restream processes restream.Start() + // Start the cluster + if a.cluster != nil { + ctx, cancel := context.WithTimeout(ctx, time.Duration(cfg.Cluster.StartupTimeout)*time.Second) + defer cancel() + + err := a.cluster.Start(ctx) + if err != nil { + return fmt.Errorf("failed to start cluster: %w", err) + } + } + // Start the service if a.service != nil { a.service.Start() diff --git a/autocert/autocert.go b/autocert/autocert.go index fde6a2fd..c3f4029d 100644 --- a/autocert/autocert.go +++ b/autocert/autocert.go @@ -18,6 +18,7 @@ import ( ) type Manager interface { + CacheManagedCertificate(ctx context.Context, hostnames []string) AcquireCertificates(ctx context.Context, hostnames []string) error ManageCertificates(ctx context.Context, hostnames []string) error HTTPChallengeResolver(ctx context.Context, listenAddress string) error @@ -166,6 +167,12 @@ func (m *manager) HTTPChallengeResolver(ctx context.Context, listenAddress strin return nil } +func (m *manager) CacheManagedCertificate(ctx context.Context, hostnames []string) { + for _, name := range hostnames { + m.config.CacheManagedCertificate(ctx, name) + } +} + // AcquireCertificates tries to acquire the certificates for the given hostnames synchronously. func (m *manager) AcquireCertificates(ctx context.Context, hostnames []string) error { m.lock.Lock() diff --git a/cluster/api.go b/cluster/api.go index 23704ef8..809cfcf3 100644 --- a/cluster/api.go +++ b/cluster/api.go @@ -849,7 +849,7 @@ func (a *api) GetKV(c echo.Context) error { a.logger.Debug().WithField("key", key).Log("Get key") - value, updatedAt, err := a.cluster.GetKV(origin, key) + value, updatedAt, err := a.cluster.GetKV(origin, key, false) if err != nil { if err == fs.ErrNotExist { a.logger.Debug().WithError(err).WithField("key", key).Log("Get key: not found") diff --git a/cluster/cluster.go b/cluster/cluster.go index af151353..09f1884f 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -33,6 +33,9 @@ import ( ) type Cluster interface { + Start(ctx context.Context) error + Shutdown() error + // Address returns the raft address of this node Address() string @@ -55,8 +58,6 @@ type Cluster interface { Leave(origin, id string) error // gracefully remove a node from the cluster Snapshot(origin string) (io.ReadCloser, error) - Shutdown() error - ListProcesses() []store.Process GetProcess(id app.ProcessID) (store.Process, error) AddProcess(origin string, config *app.Config) error @@ -81,7 +82,7 @@ type Cluster interface { SetKV(origin, key, value string) error UnsetKV(origin, key string) error - GetKV(origin, key string) (string, time.Time, error) + GetKV(origin, key string, stale bool) (string, time.Time, error) ListKV(prefix string) map[string]store.Value ProxyReader() proxy.ProxyReader @@ -163,6 +164,7 @@ type cluster struct { leaderLock sync.Mutex isTLSRequired bool + clusterKVS ClusterKVS certManager autocert.Manager nodes map[string]clusternode.Node @@ -178,7 +180,7 @@ type cluster struct { var ErrDegraded = errors.New("cluster is currently degraded") -func New(ctx context.Context, config Config) (Cluster, error) { +func New(config Config) (Cluster, error) { c := &cluster{ id: config.ID, name: config.Name, @@ -220,6 +222,11 @@ func New(ctx context.Context, config Config) (Cluster, error) { } c.isTLSRequired = c.config.TLS.Enable && c.config.TLS.Auto + if c.isTLSRequired { + if len(c.config.Host.Name) == 0 { + return nil, fmt.Errorf("tls: at least one hostname must be configured") + } + } host, port, err := gonet.SplitHostPort(c.config.Address) if err != nil { @@ -373,18 +380,51 @@ func New(ctx context.Context, config Config) (Cluster, error) { go c.monitorLeadership() go c.sentinel() - err = c.setup(ctx) - if err != nil { - c.Shutdown() - return nil, fmt.Errorf("failed to setup cluster: %w", err) + if c.isTLSRequired { + kvs, err := NewClusterKVS(c, c.logger.WithComponent("KVS")) + if err != nil { + return nil, fmt.Errorf("tls: cluster KVS: %w", err) + } + + storage, err := clusterautocert.NewStorage(kvs, "core-cluster-certificates", c.logger.WithComponent("KVS")) + if err != nil { + return nil, fmt.Errorf("tls: certificate store: %w", err) + } + + if len(c.config.TLS.Secret) != 0 { + storage = autocert.NewCryptoStorage(storage, autocert.NewCrypto(c.config.TLS.Secret)) + } + + manager, err := autocert.New(autocert.Config{ + Storage: storage, + DefaultHostname: c.config.Host.Name[0], + EmailAddress: c.config.TLS.Email, + IsProduction: !c.config.TLS.Staging, + Logger: c.logger.WithComponent("Let's Encrypt"), + }) + if err != nil { + return nil, fmt.Errorf("tls: certificate manager: %w", err) + } + + c.clusterKVS = kvs + c.certManager = manager } return c, nil } +func (c *cluster) Start(ctx context.Context) error { + err := c.setup(ctx) + if err != nil { + c.Shutdown() + return fmt.Errorf("failed to setup cluster: %w", err) + } + + return nil +} + func (c *cluster) setup(ctx context.Context) error { // Wait for a leader to be selected - c.logger.Info().Log("Waiting for a leader to be elected ...") for { @@ -404,8 +444,15 @@ func (c *cluster) setup(ctx context.Context) error { c.logger.Info().Log("Leader has been elected") - // Wait for all cluster nodes to leave degraded mode + if c.certManager != nil { + // Load certificates into cache, in case we already have them in the KV store. It + // allows the API to serve requests. This requires a raft leader. + c.clusterKVS.AllowStaleKeys(true) + c.certManager.CacheManagedCertificate(context.Background(), c.config.Host.Name) + c.clusterKVS.AllowStaleKeys(false) + } + // Wait for all cluster nodes to leave degraded mode c.logger.Info().Log("Waiting for cluster to become operational ...") for { @@ -432,7 +479,7 @@ func (c *cluster) setup(ctx context.Context) error { c.logger.Info().Log("Cluster is operational") - if c.isTLSRequired { + if c.certManager != nil { c.logger.Info().Log("Waiting for TLS certificates ...") // Create certificate manager @@ -445,41 +492,6 @@ func (c *cluster) setup(ctx context.Context) error { return fmt.Errorf("no hostnames are configured") } - kvs, err := NewClusterKVS(c, c.logger.WithComponent("KVS")) - if err != nil { - return fmt.Errorf("tls: cluster KVS: %w", err) - } - - storage, err := clusterautocert.NewStorage(kvs, "core-cluster-certificates", c.logger.WithComponent("KVS")) - if err != nil { - return fmt.Errorf("tls: certificate store: %w", err) - } - - if len(c.config.TLS.Secret) != 0 { - storage = autocert.NewCryptoStorage(storage, autocert.NewCrypto(c.config.TLS.Secret)) - } - - manager, err := autocert.New(autocert.Config{ - Storage: storage, - DefaultHostname: hostnames[0], - EmailAddress: c.config.TLS.Email, - IsProduction: !c.config.TLS.Staging, - Logger: c.logger.WithComponent("Let's Encrypt"), - }) - if err != nil { - return fmt.Errorf("tls: certificate manager: %w", err) - } - - c.certManager = manager - - resctx, rescancel := context.WithCancel(ctx) - defer rescancel() - - err = manager.HTTPChallengeResolver(resctx, c.config.Address) - if err != nil { - return fmt.Errorf("tls: failed to start the HTTP challenge resolver: %w", err) - } - // We have to wait for all nodes to have the HTTP challenge resolver started err = c.Barrier(ctx, "acme") if err != nil { @@ -488,13 +500,11 @@ func (c *cluster) setup(ctx context.Context) error { // Acquire certificates, all nodes can do this at the same time because everything // is synched via the storage. - err = manager.AcquireCertificates(ctx, hostnames) + err = c.certManager.AcquireCertificates(ctx, hostnames) if err != nil { return fmt.Errorf("tls: failed to acquire certificates: %w", err) } - rescancel() - c.logger.Info().Log("TLS certificates acquired") } @@ -1113,7 +1123,7 @@ func verifyClusterVersion(v string) error { } if !Version.Equal(version) { - return fmt.Errorf("version %s not equal to expected version %s", version.String(), Version.String()) + return fmt.Errorf("version %s not equal to my version %s", version.String(), Version.String()) } return nil @@ -1140,6 +1150,10 @@ func verifyClusterConfig(local, remote *config.Config) error { return fmt.Errorf("cluster.emergency_leader_timeout_sec is different") } + if local.Cluster.Debug.DisableFFmpegCheck != remote.Cluster.Debug.DisableFFmpegCheck { + return fmt.Errorf("cluster.debug.disable_ffmpeg_check is different") + } + if !local.API.Auth.Enable { return fmt.Errorf("api.auth.enable must be true") } diff --git a/cluster/kvs.go b/cluster/kvs.go index f730a438..361e4a4a 100644 --- a/cluster/kvs.go +++ b/cluster/kvs.go @@ -108,13 +108,15 @@ func (c *cluster) UnsetKV(origin, key string) error { return c.applyCommand(cmd) } -func (c *cluster) GetKV(origin, key string) (string, time.Time, error) { - if ok, _ := c.IsClusterDegraded(); ok { - return "", time.Time{}, ErrDegraded - } +func (c *cluster) GetKV(origin, key string, stale bool) (string, time.Time, error) { + if !stale { + if ok, _ := c.IsClusterDegraded(); ok { + return "", time.Time{}, ErrDegraded + } - if !c.IsRaftLeader() { - return c.forwarder.GetKV(origin, key) + if !c.IsRaftLeader() { + return c.forwarder.GetKV(origin, key) + } } value, err := c.store.GetFromKVS(key) @@ -131,12 +133,19 @@ func (c *cluster) ListKV(prefix string) map[string]store.Value { return storeValues } -type clusterKVS struct { - cluster Cluster - logger log.Logger +type ClusterKVS interface { + kvs.KVS + + AllowStaleKeys(allow bool) } -func NewClusterKVS(cluster Cluster, logger log.Logger) (kvs.KVS, error) { +type clusterKVS struct { + cluster Cluster + allowStale bool + logger log.Logger +} + +func NewClusterKVS(cluster Cluster, logger log.Logger) (ClusterKVS, error) { s := &clusterKVS{ cluster: cluster, logger: logger, @@ -149,6 +158,10 @@ func NewClusterKVS(cluster Cluster, logger log.Logger) (kvs.KVS, error) { return s, nil } +func (s *clusterKVS) AllowStaleKeys(allow bool) { + s.allowStale = allow +} + func (s *clusterKVS) CreateLock(name string, validUntil time.Time) (*kvs.Lock, error) { s.logger.Debug().WithFields(log.Fields{ "name": name, @@ -181,8 +194,11 @@ func (s *clusterKVS) UnsetKV(key string) error { } func (s *clusterKVS) GetKV(key string) (string, time.Time, error) { - s.logger.Debug().WithField("key", key).Log("Get KV") - return s.cluster.GetKV("", key) + s.logger.Debug().WithFields(log.Fields{ + "key": key, + "stale": s.allowStale, + }).Log("Get KV") + return s.cluster.GetKV("", key, s.allowStale) } func (s *clusterKVS) ListKV(prefix string) map[string]store.Value { diff --git a/cluster/version.go b/cluster/version.go index 5d0de6c5..b3dba8a1 100644 --- a/cluster/version.go +++ b/cluster/version.go @@ -39,5 +39,5 @@ func ParseClusterVersion(version string) (ClusterVersion, error) { var Version = ClusterVersion{ Major: 1, Minor: 0, - Patch: 0, + Patch: 2, }