diff --git a/app/api/api.go b/app/api/api.go index 466ca4ec..bda644df 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -17,6 +17,7 @@ import ( "time" "github.com/datarhei/core/v16/app" + "github.com/datarhei/core/v16/autocert" "github.com/datarhei/core/v16/cluster" "github.com/datarhei/core/v16/config" configstore "github.com/datarhei/core/v16/config/store" @@ -53,7 +54,6 @@ import ( "github.com/caddyserver/certmagic" "github.com/lestrrat-go/strftime" "go.uber.org/automaxprocs/maxprocs" - "go.uber.org/zap" ) // The API interface is the implementation for the restreamer API. @@ -473,7 +473,10 @@ func (a *api) start() error { }) } - cluster, err := cluster.New(cluster.Config{ + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + cluster, err := cluster.New(ctx, cluster.Config{ ID: cfg.ID, Name: cfg.Name, Path: filepath.Join(cfg.DB.Dir, "cluster"), @@ -482,7 +485,7 @@ func (a *api) start() error { SyncInterval: time.Duration(cfg.Cluster.SyncInterval) * time.Second, NodeRecoverTimeout: time.Duration(cfg.Cluster.NodeRecoverTimeout) * time.Second, EmergencyLeaderTimeout: time.Duration(cfg.Cluster.EmergencyLeaderTimeout) * time.Second, - Config: cfg.Clone(), + CoreConfig: cfg.Clone(), CoreAPIAddress: scheme + gonet.JoinHostPort(host, port), CoreAPIUsername: cfg.API.Auth.Username, CoreAPIPassword: cfg.API.Auth.Password, @@ -496,6 +499,54 @@ func (a *api) start() error { a.cluster = cluster } + var autocertManager autocert.Manager + + if cfg.TLS.Enable { + if cfg.TLS.Auto { + if len(cfg.Host.Name) == 0 { + return fmt.Errorf("at least one host must be provided in host.name or CORE_HOST_NAME") + } + + if a.cluster == nil { + manager, err := autocert.New(autocert.Config{ + Storage: &certmagic.FileStorage{ + Path: filepath.Join(cfg.DB.Dir, "cert"), + }, + DefaultHostname: cfg.Host.Name[0], + EmailAddress: cfg.TLS.Email, + IsProduction: false, + Logger: a.log.logger.core.WithComponent("Let's Encrypt"), + }) + if err != nil { + return fmt.Errorf("failed to initialize autocert manager: %w", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + err = manager.AcquireCertificates(ctx, cfg.Address, cfg.Host.Name) + cancel() + + autocertManager = manager + + if err != nil { + a.log.logger.core.Error().WithError(err).Log("Failed to acquire certificates") + } + + if err == nil { + a.log.logger.core.Warn().Log("Continuing with disabled TLS") + autocertManager = nil + cfg.TLS.Enable = false + } else { + cfg.TLS.CertFile = "" + cfg.TLS.KeyFile = "" + } + } else { + autocertManager = a.cluster.CertManager() + } + } else { + a.log.logger.core.Info().Log("Enabling TLS with cert and key files") + } + } + { superuser := iamidentity.User{ Name: cfg.API.Auth.Username, @@ -1222,108 +1273,6 @@ func (a *api) start() error { a.cache = cache } - var autocertManager *certmagic.Config - - if cfg.TLS.Enable { - if cfg.TLS.Auto { - if len(cfg.Host.Name) == 0 { - return fmt.Errorf("at least one host must be provided in host.name or CORE_HOST_NAME") - } - - certmagic.Default.Storage = &certmagic.FileStorage{ - Path: filepath.Join(cfg.DB.Dir, "cert"), - } - certmagic.Default.DefaultServerName = cfg.Host.Name[0] - certmagic.Default.Logger = zap.NewNop() - - certmagic.DefaultACME.Agreed = true - certmagic.DefaultACME.Email = cfg.TLS.Email - certmagic.DefaultACME.CA = certmagic.LetsEncryptProductionCA - certmagic.DefaultACME.DisableHTTPChallenge = false - certmagic.DefaultACME.DisableTLSALPNChallenge = true - certmagic.DefaultACME.Logger = zap.NewNop() - - magic := certmagic.NewDefault() - acme := certmagic.NewACMEIssuer(magic, certmagic.DefaultACME) - acme.Logger = zap.NewNop() - - magic.Issuers = []certmagic.Issuer{acme} - magic.Logger = zap.NewNop() - - autocertManager = magic - - // Start temporary http server on configured port - tempserver := &gohttp.Server{ - Addr: cfg.Address, - Handler: acme.HTTPChallengeHandler(gohttp.HandlerFunc(func(w gohttp.ResponseWriter, r *gohttp.Request) { - w.WriteHeader(gohttp.StatusNotFound) - })), - ReadTimeout: 10 * time.Second, - WriteTimeout: 10 * time.Second, - MaxHeaderBytes: 1 << 20, - } - - wg := sync.WaitGroup{} - wg.Add(1) - - go func() { - tempserver.ListenAndServe() - wg.Done() - }() - - var certerror bool - - // For each domain, get the certificate - for _, host := range cfg.Host.Name { - logger := a.log.logger.core.WithComponent("Let's Encrypt").WithField("host", host) - logger.Info().Log("Acquiring certificate ...") - - ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Minute)) - - err := autocertManager.ManageSync(ctx, []string{host}) - - cancel() - - if err != nil { - logger.Error().WithField("error", err).Log("Failed to acquire certificate") - certerror = true - /* - problems, err := letsdebug.Check(host, letsdebug.HTTP01) - if err != nil { - logger.Error().WithField("error", err).Log("Failed to debug certificate acquisition") - } - - for _, p := range problems { - logger.Error().WithFields(log.Fields{ - "name": p.Name, - "detail": p.Detail, - }).Log(p.Explanation) - } - */ - break - } - - logger.Info().Log("Successfully acquired certificate") - } - - // Shut down the temporary http server - tempserver.Close() - - wg.Wait() - - if certerror { - a.log.logger.core.Warn().Log("Continuing with disabled TLS") - autocertManager = nil - cfg.TLS.Enable = false - } else { - cfg.TLS.CertFile = "" - cfg.TLS.KeyFile = "" - } - } else { - a.log.logger.core.Info().Log("Enabling TLS with cert and key files") - } - } - if cfg.RTMP.Enable { a.log.logger.rtmp = a.log.logger.core.WithComponent("RTMP").WithField("address", cfg.RTMP.Address) @@ -1559,9 +1508,7 @@ func (a *api) start() error { a.mainserver.TLSConfig = &tls.Config{ GetCertificate: autocertManager.GetCertificate, } - - acme := autocertManager.Issuers[0].(*certmagic.ACMEIssuer) - a.sidecarserver.Handler = acme.HTTPChallengeHandler(sidecarserverhandler) + a.sidecarserver.Handler = autocertManager.HTTPChallengeHandler(sidecarserverhandler) } wgStart.Add(1) diff --git a/autocert/autocert.go b/autocert/autocert.go new file mode 100644 index 00000000..87dac54b --- /dev/null +++ b/autocert/autocert.go @@ -0,0 +1,126 @@ +package autocert + +import ( + "context" + "crypto/tls" + "fmt" + "net/http" + "strings" + "sync" + "time" + + "github.com/datarhei/core/v16/log" + + "github.com/caddyserver/certmagic" + "go.uber.org/zap" +) + +type Manager interface { + AcquireCertificates(ctx context.Context, listenAddress string, hostnames []string) error + GetCertificate(*tls.ClientHelloInfo) (*tls.Certificate, error) + HTTPChallengeHandler(h http.Handler) http.Handler +} + +type Config struct { + Storage certmagic.Storage + DefaultHostname string + EmailAddress string + IsProduction bool + Logger log.Logger +} + +type manager struct { + config *certmagic.Config + + logger log.Logger +} + +func New(config Config) (Manager, error) { + m := &manager{ + logger: config.Logger, + } + + if m.logger == nil { + m.logger = log.New("") + } + + certmagic.Default.Storage = config.Storage + certmagic.Default.DefaultServerName = config.DefaultHostname + certmagic.Default.Logger = zap.NewNop() + + ca := certmagic.LetsEncryptStagingCA + if config.IsProduction { + ca = certmagic.LetsEncryptProductionCA + } + + certmagic.DefaultACME.Agreed = true + certmagic.DefaultACME.Email = config.EmailAddress + certmagic.DefaultACME.CA = ca + certmagic.DefaultACME.DisableHTTPChallenge = false + certmagic.DefaultACME.DisableTLSALPNChallenge = true + certmagic.DefaultACME.Logger = zap.NewNop() + + magic := certmagic.NewDefault() + acme := certmagic.NewACMEIssuer(magic, certmagic.DefaultACME) + acme.Logger = zap.NewNop() + + magic.Issuers = []certmagic.Issuer{acme} + magic.Logger = zap.NewNop() + + m.config = magic + + return m, nil +} + +func (m *manager) HTTPChallengeHandler(h http.Handler) http.Handler { + acme := m.config.Issuers[0].(*certmagic.ACMEIssuer) + return acme.HTTPChallengeHandler(h) +} + +func (m *manager) GetCertificate(hello *tls.ClientHelloInfo) (*tls.Certificate, error) { + return m.config.GetCertificate(hello) +} + +func (m *manager) AcquireCertificates(ctx context.Context, listenAddress string, hostnames []string) error { + acme := m.config.Issuers[0].(*certmagic.ACMEIssuer) + + // Start temporary http server on configured port + tempserver := &http.Server{ + Addr: listenAddress, + Handler: acme.HTTPChallengeHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotFound) + })), + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + MaxHeaderBytes: 1 << 20, + } + + wg := sync.WaitGroup{} + wg.Add(1) + + go func() { + tempserver.ListenAndServe() + wg.Done() + }() + + var certerr error + + // Get the certificates + logger := m.logger.WithField("hostnames", hostnames) + logger.Info().Log("Acquiring certificate ...") + + err := m.config.ManageSync(ctx, hostnames) + + if err != nil { + certerr = fmt.Errorf("failed to acquire certificate for %s: %w", strings.Join(hostnames, ","), err) + } else { + logger.Info().Log("Successfully acquired certificate") + } + + // Shut down the temporary http server + tempserver.Close() + + wg.Wait() + + return certerr +} diff --git a/cluster/api.go b/cluster/api.go index 54a79ca1..b8943255 100644 --- a/cluster/api.go +++ b/cluster/api.go @@ -104,6 +104,21 @@ func NewAPI(config APIConfig) (API, error) { return c.JSON(http.StatusOK, Version.String()) }) + a.router.GET("/v1/ready", func(c echo.Context) error { + origin := c.Request().Header.Get("X-Cluster-Origin") + + if origin == a.id { + return Err(http.StatusLoopDetected, "", "breaking circuit") + } + + err := a.cluster.IsReady(origin) + if err != nil { + return Err(http.StatusLocked, "", "not ready yet") + } + + return c.JSON(http.StatusOK, "OK") + }) + a.router.POST("/v1/server", a.AddServer) a.router.DELETE("/v1/server/:id", a.RemoveServer) diff --git a/cluster/client/client.go b/cluster/client/client.go index f1779f84..ce7f8758 100644 --- a/cluster/client/client.go +++ b/cluster/client/client.go @@ -253,6 +253,12 @@ func (c *APIClient) Snapshot() (io.ReadCloser, error) { return c.stream(http.MethodGet, "/v1/snapshot", "", nil, "") } +func (c *APIClient) IsReady(origin string) error { + _, err := c.call(http.MethodGet, "/v1/ready", "application/json", nil, origin) + + return err +} + func (c *APIClient) stream(method, path, contentType string, data io.Reader, origin string) (io.ReadCloser, error) { if len(c.Address) == 0 { return nil, fmt.Errorf("no address defined") diff --git a/cluster/cluster.go b/cluster/cluster.go index 085703de..ff72703c 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -8,10 +8,12 @@ import ( "io" gonet "net" "net/url" + "sort" "strconv" "sync" "time" + "github.com/datarhei/core/v16/autocert" apiclient "github.com/datarhei/core/v16/cluster/client" "github.com/datarhei/core/v16/cluster/forwarder" clusteriam "github.com/datarhei/core/v16/cluster/iam" @@ -42,6 +44,7 @@ type Cluster interface { CoreConfig() *config.Config About() (ClusterAbout, error) + IsReady(origin string) error Join(origin, id, raftAddress, peerAddress string) error Leave(origin, id string) error // gracefully remove a node from the cluster @@ -77,6 +80,7 @@ type Cluster interface { ListKV(prefix string) map[string]store.Value ProxyReader() proxy.ProxyReader + CertManager() autocert.Manager } type Peer struct { @@ -99,7 +103,7 @@ type Config struct { CoreAPIUsername string // Username for the core API CoreAPIPassword string // Password for the core API - Config *config.Config + CoreConfig *config.Config IPLimiter net.IPLimiter Logger log.Logger @@ -138,22 +142,29 @@ type cluster struct { config *config.Config coreAddress string - isDegraded bool - isDegradedErr error - stateLock sync.Mutex + isDegraded bool + isDegradedErr error + isCoreDegraded bool + isCoreDegradedErr error + stateLock sync.Mutex isRaftLeader bool hasRaftLeader bool isLeader bool leaderLock sync.Mutex + isTLSRequired bool + certManager autocert.Manager + nodes map[string]*clusterNode nodesLock sync.RWMutex + + ready bool } var ErrDegraded = errors.New("cluster is currently degraded") -func New(config Config) (Cluster, error) { +func New(ctx context.Context, config Config) (Cluster, error) { c := &cluster{ id: config.ID, name: config.Name, @@ -169,7 +180,7 @@ func New(config Config) (Cluster, error) { nodeRecoverTimeout: config.NodeRecoverTimeout, emergencyLeaderTimeout: config.EmergencyLeaderTimeout, - config: config.Config, + config: config.CoreConfig, nodes: map[string]*clusterNode{}, } @@ -177,6 +188,8 @@ func New(config Config) (Cluster, error) { return nil, fmt.Errorf("the core config must be provided") } + c.isTLSRequired = c.config.TLS.Enable && c.config.TLS.Auto + u, err := url.Parse(config.CoreAPIAddress) if err != nil { return nil, fmt.Errorf("invalid core API address: %w", err) @@ -312,6 +325,112 @@ func New(config Config) (Cluster, error) { go c.monitorLeadership() go c.sentinel() + // Wait for a leader to be selected + + c.logger.Info().Log("Waiting for a leader to be elected ...") + + for { + leader := c.raft.Leader() + if len(leader) != 0 { + break + } + + select { + case <-ctx.Done(): + c.Shutdown() + return nil, fmt.Errorf("starting cluster has been aborted: %w", ctx.Err()) + default: + } + + time.Sleep(100 * time.Millisecond) + } + + c.logger.Info().Log("Leader has been elected") + + // Wait for cluster to leave degraded mode + + c.logger.Info().Log("Waiting for cluster to become operational ...") + + for { + ok, _ := c.IsClusterDegraded() + if !ok { + break + } + + select { + case <-ctx.Done(): + c.Shutdown() + return nil, fmt.Errorf("starting cluster has been aborted: %w", ctx.Err()) + default: + } + + time.Sleep(100 * time.Millisecond) + } + + c.logger.Info().Log("Cluster is operational") + + if c.isTLSRequired && c.IsRaftLeader() { + names, err := c.getClusterHostnames() + if err != nil { + c.Shutdown() + return nil, fmt.Errorf("failed to assemble list of all configured hostnames: %w", err) + } + + kvs, err := NewClusterKVS(c) + if err != nil { + c.Shutdown() + return nil, fmt.Errorf("cluster KVS: %w", err) + } + + storage, err := NewClusterStorage(kvs, "core-cluster-certificates") + if err != nil { + c.Shutdown() + return nil, fmt.Errorf("certificate store: %w", err) + } + + manager, err := autocert.New(autocert.Config{ + Storage: storage, + DefaultHostname: names[0], + EmailAddress: c.config.TLS.Email, + IsProduction: false, + Logger: c.logger.WithComponent("Let's Encrypt"), + }) + if err != nil { + c.Shutdown() + return nil, fmt.Errorf("certificate manager: %w", err) + } + + c.certManager = manager + + err = manager.AcquireCertificates(ctx, c.config.Address, names) + if err != nil { + c.Shutdown() + return nil, fmt.Errorf("failed to acquire certificates: %w", err) + } + } + + if !c.IsRaftLeader() { + for { + err := c.IsReady("") + if err == nil { + break + } + + select { + case <-ctx.Done(): + c.Shutdown() + return nil, fmt.Errorf("starting cluster has been aborted: %w", ctx.Err()) + default: + } + + time.Sleep(time.Second) + } + } + + c.ready = true + + c.logger.Info().Log("Cluster is ready") + return c, nil } @@ -368,6 +487,10 @@ func (c *cluster) CoreConfig() *config.Config { return c.config.Clone() } +func (c *cluster) CertManager() autocert.Manager { + return c.certManager +} + func (c *cluster) Shutdown() error { c.logger.Info().Log("Shutting down cluster") c.shutdownLock.Lock() @@ -417,6 +540,17 @@ func (c *cluster) IsDegraded() (bool, error) { c.stateLock.Lock() defer c.stateLock.Unlock() + if c.isDegraded { + return c.isDegraded, c.isDegradedErr + } + + return c.isCoreDegraded, c.isCoreDegradedErr +} + +func (c *cluster) IsClusterDegraded() (bool, error) { + c.stateLock.Lock() + defer c.stateLock.Unlock() + return c.isDegraded, c.isDegradedErr } @@ -701,6 +835,19 @@ func (c *cluster) trackNodeChanges() { c.isDegradedErr = nil } c.stateLock.Unlock() + + // Put the cluster in "coreDegraded" mode in case there's a mismatch in expected values + err = c.checkClusterCoreNodes() + + c.stateLock.Lock() + if err != nil { + c.isCoreDegraded = true + c.isCoreDegradedErr = err + } else { + c.isCoreDegraded = false + c.isCoreDegradedErr = nil + } + c.stateLock.Unlock() case <-c.shutdownCh: return } @@ -733,6 +880,47 @@ func (c *cluster) checkClusterNodes() error { return nil } +func (c *cluster) checkClusterCoreNodes() error { + c.nodesLock.RLock() + defer c.nodesLock.RUnlock() + + for id, node := range c.nodes { + if status, err := node.CoreStatus(); status == "offline" { + return fmt.Errorf("node %s core is offline: %w", id, err) + } + } + + return nil +} + +func (c *cluster) getClusterHostnames() ([]string, error) { + hostnames := map[string]struct{}{} + + c.nodesLock.RLock() + defer c.nodesLock.RUnlock() + + for id, node := range c.nodes { + config, err := node.CoreConfig() + if err != nil { + return nil, fmt.Errorf("node %s has no configuration available: %w", id, err) + } + + for _, name := range config.Host.Name { + hostnames[name] = struct{}{} + } + } + + names := []string{} + + for key := range hostnames { + names = append(names, key) + } + + sort.Strings(names) + + return names, nil +} + func verifyClusterVersion(v string) error { version, err := ParseClusterVersion(v) if err != nil { @@ -771,16 +959,20 @@ func verifyClusterConfig(local, remote *config.Config) error { return fmt.Errorf("rtmp.enable is different") } - if local.RTMP.App != remote.RTMP.App { - return fmt.Errorf("rtmp.app is different") + if local.RTMP.Enable { + if local.RTMP.App != remote.RTMP.App { + return fmt.Errorf("rtmp.app is different") + } } if local.SRT.Enable != remote.SRT.Enable { return fmt.Errorf("srt.enable is different") } - if local.SRT.Passphrase != remote.SRT.Passphrase { - return fmt.Errorf("srt.passphrase is different") + if local.SRT.Enable { + if local.SRT.Passphrase != remote.SRT.Passphrase { + return fmt.Errorf("srt.passphrase is different") + } } if local.Resources.MaxCPUUsage == 0 || remote.Resources.MaxCPUUsage == 0 { @@ -791,6 +983,20 @@ func verifyClusterConfig(local, remote *config.Config) error { return fmt.Errorf("resources.max_memory_usage must be defined") } + if local.TLS.Enable != remote.TLS.Enable { + return fmt.Errorf("tls.enable is different") + } + + if local.TLS.Enable { + if local.TLS.Auto != remote.TLS.Auto { + return fmt.Errorf("tls.auto is different") + } + + if len(local.Host.Name) == 0 || len(remote.Host.Name) == 0 { + return fmt.Errorf("host.name must be set") + } + } + return nil } @@ -1085,7 +1291,7 @@ func (c *cluster) RemoveIdentity(origin string, name string) error { } func (c *cluster) CreateLock(origin string, name string, validUntil time.Time) (*Lock, error) { - if ok, _ := c.IsDegraded(); ok { + if ok, _ := c.IsClusterDegraded(); ok { return nil, ErrDegraded } @@ -1123,7 +1329,7 @@ func (c *cluster) CreateLock(origin string, name string, validUntil time.Time) ( } func (c *cluster) DeleteLock(origin string, name string) error { - if ok, _ := c.IsDegraded(); ok { + if ok, _ := c.IsClusterDegraded(); ok { return ErrDegraded } @@ -1146,7 +1352,7 @@ func (c *cluster) ListLocks() map[string]time.Time { } func (c *cluster) SetKV(origin, key, value string) error { - if ok, _ := c.IsDegraded(); ok { + if ok, _ := c.IsClusterDegraded(); ok { return ErrDegraded } @@ -1166,7 +1372,7 @@ func (c *cluster) SetKV(origin, key, value string) error { } func (c *cluster) UnsetKV(origin, key string) error { - if ok, _ := c.IsDegraded(); ok { + if ok, _ := c.IsClusterDegraded(); ok { return ErrDegraded } @@ -1199,6 +1405,22 @@ func (c *cluster) ListKV(prefix string) map[string]store.Value { return storeValues } +func (c *cluster) IsReady(origin string) error { + if ok, _ := c.IsClusterDegraded(); ok { + return ErrDegraded + } + + if !c.IsRaftLeader() { + return c.forwarder.IsReady(origin) + } + + if !c.ready { + return fmt.Errorf("no ready yet") + } + + return nil +} + func (c *cluster) applyCommand(cmd *store.Command) error { b, err := json.Marshal(cmd) if err != nil { diff --git a/cluster/forwarder/forwarder.go b/cluster/forwarder/forwarder.go index 4fe49378..9c796ab5 100644 --- a/cluster/forwarder/forwarder.go +++ b/cluster/forwarder/forwarder.go @@ -38,6 +38,8 @@ type Forwarder interface { SetKV(origin, key, value string) error UnsetKV(origin, key string) error + + IsReady(origin string) error } type forwarder struct { @@ -339,3 +341,15 @@ func (f *forwarder) UnsetKV(origin, key string) error { return client.UnsetKV(origin, key) } + +func (f *forwarder) IsReady(origin string) error { + if origin == "" { + origin = f.id + } + + f.lock.RLock() + client := f.client + f.lock.RUnlock() + + return client.IsReady(origin) +} diff --git a/cluster/node.go b/cluster/node.go index 8a82df6d..a4bd0e73 100644 --- a/cluster/node.go +++ b/cluster/node.go @@ -137,7 +137,14 @@ func (n *clusterNode) Status() (string, error) { return "offline", fmt.Errorf("the cluster API didn't respond for %s because: %w", since, n.lastContactErr) } - since = time.Since(n.lastCoreContact) + return "online", nil +} + +func (n *clusterNode) CoreStatus() (string, error) { + n.pingLock.RLock() + defer n.pingLock.RUnlock() + + since := time.Since(n.lastCoreContact) if since > 5*time.Second { return "offline", fmt.Errorf("the core API didn't respond for %s because: %w", since, n.lastCoreContactErr) } diff --git a/cluster/raft/raft.go b/cluster/raft/raft.go index 32484058..eed1bba3 100644 --- a/cluster/raft/raft.go +++ b/cluster/raft/raft.go @@ -488,7 +488,6 @@ func (r *raft) trackLeaderChanges() { obsCh := make(chan hcraft.Observation, 16) observer := hcraft.NewObserver(obsCh, false, func(o *hcraft.Observation) bool { _, leaderOK := o.Data.(hcraft.LeaderObservation) - return leaderOK })