mirror of
https://github.com/datarhei/core.git
synced 2025-10-04 15:42:57 +08:00
Start cluster after core API has been started
The core API will be started before the cluster is started in order to access the cluster endpoints during a cluster upgrade. If TLS is enabled, possibly stale certificates are loaded into the cache. Otherwise the leader has to be contacted via the cluster API which might have changed.
This commit is contained in:
@@ -333,6 +333,43 @@ func (a *api) start(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
if cfg.Debug.ForceGC > 0 {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
a.gcTickerStop = cancel
|
||||
|
||||
go func(ctx context.Context) {
|
||||
ticker := time.NewTicker(time.Duration(cfg.Debug.ForceGC) * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
debug.FreeOSMemory()
|
||||
/*
|
||||
var mem runtime.MemStats
|
||||
runtime.ReadMemStats(&mem)
|
||||
a.log.logger.main.WithComponent("memory").Debug().WithFields(log.Fields{
|
||||
"Sys": float64(mem.Sys) / (1 << 20),
|
||||
"HeapSys": float64(mem.HeapSys) / (1 << 20),
|
||||
"HeapAlloc": float64(mem.HeapAlloc) / (1 << 20),
|
||||
"HeapIdle": float64(mem.HeapIdle) / (1 << 20),
|
||||
"HeapInuse": float64(mem.HeapInuse) / (1 << 20),
|
||||
"HeapReleased": float64(mem.HeapReleased) / (1 << 20),
|
||||
}).Log("")
|
||||
*/
|
||||
}
|
||||
}
|
||||
}(ctx)
|
||||
}
|
||||
|
||||
if cfg.Debug.MemoryLimit > 0 {
|
||||
debug.SetMemoryLimit(cfg.Debug.MemoryLimit * 1024 * 1024)
|
||||
} else {
|
||||
debug.SetMemoryLimit(math.MaxInt64)
|
||||
}
|
||||
|
||||
resources, err := resources.New(resources.Config{
|
||||
MaxCPU: cfg.Resources.MaxCPUUsage,
|
||||
MaxMemory: cfg.Resources.MaxMemoryUsage,
|
||||
@@ -492,10 +529,7 @@ func (a *api) start(ctx context.Context) error {
|
||||
})
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, time.Duration(cfg.Cluster.StartupTimeout)*time.Second)
|
||||
defer cancel()
|
||||
|
||||
cluster, err := cluster.New(ctx, cluster.Config{
|
||||
cluster, err := cluster.New(cluster.Config{
|
||||
ID: cfg.ID,
|
||||
Name: cfg.Name,
|
||||
Path: filepath.Join(cfg.DB.Dir, "cluster"),
|
||||
@@ -1656,45 +1690,20 @@ func (a *api) start(ctx context.Context) error {
|
||||
// Wait for all servers to be started
|
||||
wgStart.Wait()
|
||||
|
||||
if cfg.Debug.ForceGC > 0 {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
a.gcTickerStop = cancel
|
||||
|
||||
go func(ctx context.Context) {
|
||||
ticker := time.NewTicker(time.Duration(cfg.Debug.ForceGC) * time.Second)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
debug.FreeOSMemory()
|
||||
/*
|
||||
var mem runtime.MemStats
|
||||
runtime.ReadMemStats(&mem)
|
||||
a.log.logger.main.WithComponent("memory").Debug().WithFields(log.Fields{
|
||||
"Sys": float64(mem.Sys) / (1 << 20),
|
||||
"HeapSys": float64(mem.HeapSys) / (1 << 20),
|
||||
"HeapAlloc": float64(mem.HeapAlloc) / (1 << 20),
|
||||
"HeapIdle": float64(mem.HeapIdle) / (1 << 20),
|
||||
"HeapInuse": float64(mem.HeapInuse) / (1 << 20),
|
||||
"HeapReleased": float64(mem.HeapReleased) / (1 << 20),
|
||||
}).Log("")
|
||||
*/
|
||||
}
|
||||
}
|
||||
}(ctx)
|
||||
}
|
||||
|
||||
if cfg.Debug.MemoryLimit > 0 {
|
||||
debug.SetMemoryLimit(cfg.Debug.MemoryLimit * 1024 * 1024)
|
||||
} else {
|
||||
debug.SetMemoryLimit(math.MaxInt64)
|
||||
}
|
||||
|
||||
// Start the restream processes
|
||||
restream.Start()
|
||||
|
||||
// Start the cluster
|
||||
if a.cluster != nil {
|
||||
ctx, cancel := context.WithTimeout(ctx, time.Duration(cfg.Cluster.StartupTimeout)*time.Second)
|
||||
defer cancel()
|
||||
|
||||
err := a.cluster.Start(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start cluster: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Start the service
|
||||
if a.service != nil {
|
||||
a.service.Start()
|
||||
|
@@ -18,6 +18,7 @@ import (
|
||||
)
|
||||
|
||||
type Manager interface {
|
||||
CacheManagedCertificate(ctx context.Context, hostnames []string)
|
||||
AcquireCertificates(ctx context.Context, hostnames []string) error
|
||||
ManageCertificates(ctx context.Context, hostnames []string) error
|
||||
HTTPChallengeResolver(ctx context.Context, listenAddress string) error
|
||||
@@ -166,6 +167,12 @@ func (m *manager) HTTPChallengeResolver(ctx context.Context, listenAddress strin
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *manager) CacheManagedCertificate(ctx context.Context, hostnames []string) {
|
||||
for _, name := range hostnames {
|
||||
m.config.CacheManagedCertificate(ctx, name)
|
||||
}
|
||||
}
|
||||
|
||||
// AcquireCertificates tries to acquire the certificates for the given hostnames synchronously.
|
||||
func (m *manager) AcquireCertificates(ctx context.Context, hostnames []string) error {
|
||||
m.lock.Lock()
|
||||
|
@@ -849,7 +849,7 @@ func (a *api) GetKV(c echo.Context) error {
|
||||
|
||||
a.logger.Debug().WithField("key", key).Log("Get key")
|
||||
|
||||
value, updatedAt, err := a.cluster.GetKV(origin, key)
|
||||
value, updatedAt, err := a.cluster.GetKV(origin, key, false)
|
||||
if err != nil {
|
||||
if err == fs.ErrNotExist {
|
||||
a.logger.Debug().WithError(err).WithField("key", key).Log("Get key: not found")
|
||||
|
@@ -33,6 +33,9 @@ import (
|
||||
)
|
||||
|
||||
type Cluster interface {
|
||||
Start(ctx context.Context) error
|
||||
Shutdown() error
|
||||
|
||||
// Address returns the raft address of this node
|
||||
Address() string
|
||||
|
||||
@@ -55,8 +58,6 @@ type Cluster interface {
|
||||
Leave(origin, id string) error // gracefully remove a node from the cluster
|
||||
Snapshot(origin string) (io.ReadCloser, error)
|
||||
|
||||
Shutdown() error
|
||||
|
||||
ListProcesses() []store.Process
|
||||
GetProcess(id app.ProcessID) (store.Process, error)
|
||||
AddProcess(origin string, config *app.Config) error
|
||||
@@ -81,7 +82,7 @@ type Cluster interface {
|
||||
|
||||
SetKV(origin, key, value string) error
|
||||
UnsetKV(origin, key string) error
|
||||
GetKV(origin, key string) (string, time.Time, error)
|
||||
GetKV(origin, key string, stale bool) (string, time.Time, error)
|
||||
ListKV(prefix string) map[string]store.Value
|
||||
|
||||
ProxyReader() proxy.ProxyReader
|
||||
@@ -163,6 +164,7 @@ type cluster struct {
|
||||
leaderLock sync.Mutex
|
||||
|
||||
isTLSRequired bool
|
||||
clusterKVS ClusterKVS
|
||||
certManager autocert.Manager
|
||||
|
||||
nodes map[string]clusternode.Node
|
||||
@@ -178,7 +180,7 @@ type cluster struct {
|
||||
|
||||
var ErrDegraded = errors.New("cluster is currently degraded")
|
||||
|
||||
func New(ctx context.Context, config Config) (Cluster, error) {
|
||||
func New(config Config) (Cluster, error) {
|
||||
c := &cluster{
|
||||
id: config.ID,
|
||||
name: config.Name,
|
||||
@@ -220,6 +222,11 @@ func New(ctx context.Context, config Config) (Cluster, error) {
|
||||
}
|
||||
|
||||
c.isTLSRequired = c.config.TLS.Enable && c.config.TLS.Auto
|
||||
if c.isTLSRequired {
|
||||
if len(c.config.Host.Name) == 0 {
|
||||
return nil, fmt.Errorf("tls: at least one hostname must be configured")
|
||||
}
|
||||
}
|
||||
|
||||
host, port, err := gonet.SplitHostPort(c.config.Address)
|
||||
if err != nil {
|
||||
@@ -373,18 +380,51 @@ func New(ctx context.Context, config Config) (Cluster, error) {
|
||||
go c.monitorLeadership()
|
||||
go c.sentinel()
|
||||
|
||||
err = c.setup(ctx)
|
||||
if c.isTLSRequired {
|
||||
kvs, err := NewClusterKVS(c, c.logger.WithComponent("KVS"))
|
||||
if err != nil {
|
||||
c.Shutdown()
|
||||
return nil, fmt.Errorf("failed to setup cluster: %w", err)
|
||||
return nil, fmt.Errorf("tls: cluster KVS: %w", err)
|
||||
}
|
||||
|
||||
storage, err := clusterautocert.NewStorage(kvs, "core-cluster-certificates", c.logger.WithComponent("KVS"))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("tls: certificate store: %w", err)
|
||||
}
|
||||
|
||||
if len(c.config.TLS.Secret) != 0 {
|
||||
storage = autocert.NewCryptoStorage(storage, autocert.NewCrypto(c.config.TLS.Secret))
|
||||
}
|
||||
|
||||
manager, err := autocert.New(autocert.Config{
|
||||
Storage: storage,
|
||||
DefaultHostname: c.config.Host.Name[0],
|
||||
EmailAddress: c.config.TLS.Email,
|
||||
IsProduction: !c.config.TLS.Staging,
|
||||
Logger: c.logger.WithComponent("Let's Encrypt"),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("tls: certificate manager: %w", err)
|
||||
}
|
||||
|
||||
c.clusterKVS = kvs
|
||||
c.certManager = manager
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (c *cluster) Start(ctx context.Context) error {
|
||||
err := c.setup(ctx)
|
||||
if err != nil {
|
||||
c.Shutdown()
|
||||
return fmt.Errorf("failed to setup cluster: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *cluster) setup(ctx context.Context) error {
|
||||
// Wait for a leader to be selected
|
||||
|
||||
c.logger.Info().Log("Waiting for a leader to be elected ...")
|
||||
|
||||
for {
|
||||
@@ -404,8 +444,15 @@ func (c *cluster) setup(ctx context.Context) error {
|
||||
|
||||
c.logger.Info().Log("Leader has been elected")
|
||||
|
||||
// Wait for all cluster nodes to leave degraded mode
|
||||
if c.certManager != nil {
|
||||
// Load certificates into cache, in case we already have them in the KV store. It
|
||||
// allows the API to serve requests. This requires a raft leader.
|
||||
c.clusterKVS.AllowStaleKeys(true)
|
||||
c.certManager.CacheManagedCertificate(context.Background(), c.config.Host.Name)
|
||||
c.clusterKVS.AllowStaleKeys(false)
|
||||
}
|
||||
|
||||
// Wait for all cluster nodes to leave degraded mode
|
||||
c.logger.Info().Log("Waiting for cluster to become operational ...")
|
||||
|
||||
for {
|
||||
@@ -432,7 +479,7 @@ func (c *cluster) setup(ctx context.Context) error {
|
||||
|
||||
c.logger.Info().Log("Cluster is operational")
|
||||
|
||||
if c.isTLSRequired {
|
||||
if c.certManager != nil {
|
||||
c.logger.Info().Log("Waiting for TLS certificates ...")
|
||||
|
||||
// Create certificate manager
|
||||
@@ -445,41 +492,6 @@ func (c *cluster) setup(ctx context.Context) error {
|
||||
return fmt.Errorf("no hostnames are configured")
|
||||
}
|
||||
|
||||
kvs, err := NewClusterKVS(c, c.logger.WithComponent("KVS"))
|
||||
if err != nil {
|
||||
return fmt.Errorf("tls: cluster KVS: %w", err)
|
||||
}
|
||||
|
||||
storage, err := clusterautocert.NewStorage(kvs, "core-cluster-certificates", c.logger.WithComponent("KVS"))
|
||||
if err != nil {
|
||||
return fmt.Errorf("tls: certificate store: %w", err)
|
||||
}
|
||||
|
||||
if len(c.config.TLS.Secret) != 0 {
|
||||
storage = autocert.NewCryptoStorage(storage, autocert.NewCrypto(c.config.TLS.Secret))
|
||||
}
|
||||
|
||||
manager, err := autocert.New(autocert.Config{
|
||||
Storage: storage,
|
||||
DefaultHostname: hostnames[0],
|
||||
EmailAddress: c.config.TLS.Email,
|
||||
IsProduction: !c.config.TLS.Staging,
|
||||
Logger: c.logger.WithComponent("Let's Encrypt"),
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("tls: certificate manager: %w", err)
|
||||
}
|
||||
|
||||
c.certManager = manager
|
||||
|
||||
resctx, rescancel := context.WithCancel(ctx)
|
||||
defer rescancel()
|
||||
|
||||
err = manager.HTTPChallengeResolver(resctx, c.config.Address)
|
||||
if err != nil {
|
||||
return fmt.Errorf("tls: failed to start the HTTP challenge resolver: %w", err)
|
||||
}
|
||||
|
||||
// We have to wait for all nodes to have the HTTP challenge resolver started
|
||||
err = c.Barrier(ctx, "acme")
|
||||
if err != nil {
|
||||
@@ -488,13 +500,11 @@ func (c *cluster) setup(ctx context.Context) error {
|
||||
|
||||
// Acquire certificates, all nodes can do this at the same time because everything
|
||||
// is synched via the storage.
|
||||
err = manager.AcquireCertificates(ctx, hostnames)
|
||||
err = c.certManager.AcquireCertificates(ctx, hostnames)
|
||||
if err != nil {
|
||||
return fmt.Errorf("tls: failed to acquire certificates: %w", err)
|
||||
}
|
||||
|
||||
rescancel()
|
||||
|
||||
c.logger.Info().Log("TLS certificates acquired")
|
||||
}
|
||||
|
||||
@@ -1113,7 +1123,7 @@ func verifyClusterVersion(v string) error {
|
||||
}
|
||||
|
||||
if !Version.Equal(version) {
|
||||
return fmt.Errorf("version %s not equal to expected version %s", version.String(), Version.String())
|
||||
return fmt.Errorf("version %s not equal to my version %s", version.String(), Version.String())
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -1140,6 +1150,10 @@ func verifyClusterConfig(local, remote *config.Config) error {
|
||||
return fmt.Errorf("cluster.emergency_leader_timeout_sec is different")
|
||||
}
|
||||
|
||||
if local.Cluster.Debug.DisableFFmpegCheck != remote.Cluster.Debug.DisableFFmpegCheck {
|
||||
return fmt.Errorf("cluster.debug.disable_ffmpeg_check is different")
|
||||
}
|
||||
|
||||
if !local.API.Auth.Enable {
|
||||
return fmt.Errorf("api.auth.enable must be true")
|
||||
}
|
||||
|
@@ -108,7 +108,8 @@ func (c *cluster) UnsetKV(origin, key string) error {
|
||||
return c.applyCommand(cmd)
|
||||
}
|
||||
|
||||
func (c *cluster) GetKV(origin, key string) (string, time.Time, error) {
|
||||
func (c *cluster) GetKV(origin, key string, stale bool) (string, time.Time, error) {
|
||||
if !stale {
|
||||
if ok, _ := c.IsClusterDegraded(); ok {
|
||||
return "", time.Time{}, ErrDegraded
|
||||
}
|
||||
@@ -116,6 +117,7 @@ func (c *cluster) GetKV(origin, key string) (string, time.Time, error) {
|
||||
if !c.IsRaftLeader() {
|
||||
return c.forwarder.GetKV(origin, key)
|
||||
}
|
||||
}
|
||||
|
||||
value, err := c.store.GetFromKVS(key)
|
||||
if err != nil {
|
||||
@@ -131,12 +133,19 @@ func (c *cluster) ListKV(prefix string) map[string]store.Value {
|
||||
return storeValues
|
||||
}
|
||||
|
||||
type ClusterKVS interface {
|
||||
kvs.KVS
|
||||
|
||||
AllowStaleKeys(allow bool)
|
||||
}
|
||||
|
||||
type clusterKVS struct {
|
||||
cluster Cluster
|
||||
allowStale bool
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
func NewClusterKVS(cluster Cluster, logger log.Logger) (kvs.KVS, error) {
|
||||
func NewClusterKVS(cluster Cluster, logger log.Logger) (ClusterKVS, error) {
|
||||
s := &clusterKVS{
|
||||
cluster: cluster,
|
||||
logger: logger,
|
||||
@@ -149,6 +158,10 @@ func NewClusterKVS(cluster Cluster, logger log.Logger) (kvs.KVS, error) {
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *clusterKVS) AllowStaleKeys(allow bool) {
|
||||
s.allowStale = allow
|
||||
}
|
||||
|
||||
func (s *clusterKVS) CreateLock(name string, validUntil time.Time) (*kvs.Lock, error) {
|
||||
s.logger.Debug().WithFields(log.Fields{
|
||||
"name": name,
|
||||
@@ -181,8 +194,11 @@ func (s *clusterKVS) UnsetKV(key string) error {
|
||||
}
|
||||
|
||||
func (s *clusterKVS) GetKV(key string) (string, time.Time, error) {
|
||||
s.logger.Debug().WithField("key", key).Log("Get KV")
|
||||
return s.cluster.GetKV("", key)
|
||||
s.logger.Debug().WithFields(log.Fields{
|
||||
"key": key,
|
||||
"stale": s.allowStale,
|
||||
}).Log("Get KV")
|
||||
return s.cluster.GetKV("", key, s.allowStale)
|
||||
}
|
||||
|
||||
func (s *clusterKVS) ListKV(prefix string) map[string]store.Value {
|
||||
|
@@ -39,5 +39,5 @@ func ParseClusterVersion(version string) (ClusterVersion, error) {
|
||||
var Version = ClusterVersion{
|
||||
Major: 1,
|
||||
Minor: 0,
|
||||
Patch: 0,
|
||||
Patch: 2,
|
||||
}
|
||||
|
Reference in New Issue
Block a user