diff --git a/app/api/api.go b/app/api/api.go index 23c747f8..e5bfc30e 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -2,7 +2,6 @@ package api import ( "context" - "crypto/tls" "fmt" "io" golog "log" @@ -492,9 +491,16 @@ func (a *api) start(ctx context.Context) error { return fmt.Errorf("failed to initialize autocert manager: %w", err) } - ctx, cancel := context.WithTimeout(ctx, 5*time.Minute) - err = manager.AcquireCertificates(ctx, cfg.Address, cfg.Host.Name) - cancel() + resctx, rescancel := context.WithCancel(ctx) + err = manager.HTTPChallengeResolver(resctx, cfg.Address) + + if err == nil { + ctx, cancel := context.WithTimeout(ctx, 5*time.Minute) + err = manager.AcquireCertificates(ctx, cfg.Host.Name) + cancel() + } + + rescancel() autocertManager = manager @@ -1266,9 +1272,7 @@ func (a *api) start(ctx context.Context) error { a.log.logger.rtmps = a.log.logger.core.WithComponent("RTMPS").WithField("address", cfg.RTMP.AddressTLS) if autocertManager != nil { - config.TLSConfig = &tls.Config{ - GetCertificate: autocertManager.GetCertificate, - } + config.TLSConfig = autocertManager.TLSConfig() } } @@ -1476,9 +1480,7 @@ func (a *api) start(ctx context.Context) error { } if cfg.TLS.Auto { - a.mainserver.TLSConfig = &tls.Config{ - GetCertificate: autocertManager.GetCertificate, - } + a.mainserver.TLSConfig = autocertManager.TLSConfig() a.sidecarserver.Handler = autocertManager.HTTPChallengeHandler(sidecarserverhandler) } diff --git a/autocert/autocert.go b/autocert/autocert.go index e3b672f0..4285dde3 100644 --- a/autocert/autocert.go +++ b/autocert/autocert.go @@ -5,8 +5,6 @@ import ( "crypto/tls" "fmt" "net/http" - "net/http/httputil" - "net/url" "strings" "sync" "time" @@ -14,13 +12,18 @@ import ( "github.com/datarhei/core/v16/log" "github.com/caddyserver/certmagic" + "github.com/klauspost/cpuid/v2" "go.uber.org/zap" ) type Manager interface { - AcquireCertificates(ctx context.Context, listenAddress string, hostnames []string) error - GetCertificate(*tls.ClientHelloInfo) (*tls.Certificate, error) + AcquireCertificates(ctx context.Context, hostnames []string) error + ManageCertificates(ctx context.Context, hostnames []string) error + HTTPChallengeResolver(ctx context.Context, listenAddress string) error HTTPChallengeHandler(h http.Handler) http.Handler + GetCertificate(*tls.ClientHelloInfo) (*tls.Certificate, error) + TLSConfig() *tls.Config + ManagedNames() []string } type Config struct { @@ -34,12 +37,16 @@ type Config struct { type manager struct { config *certmagic.Config + hostnames []string + lock sync.Mutex + logger log.Logger } func New(config Config) (Manager, error) { m := &manager{ - logger: config.Logger, + hostnames: []string{}, + logger: config.Logger, } if m.logger == nil { @@ -77,16 +84,38 @@ func New(config Config) (Manager, error) { return m, nil } +// HTTPChallengeHandler wraps h in a handler that can solve the ACME +// HTTP challenge. cfg is required, and it must have a certificate +// cache backed by a functional storage facility, since that is where +// the challenge state is stored between initiation and solution. +// +// If a request is not an ACME HTTP challenge, h will be invoked. func (m *manager) HTTPChallengeHandler(h http.Handler) http.Handler { acme := m.config.Issuers[0].(*certmagic.ACMEIssuer) return acme.HTTPChallengeHandler(h) } +// GetCertificate gets a certificate to satisfy clientHello. In getting +// the certificate, it abides the rules and settings defined in the Config +// that matches clientHello.ServerName. It tries to get certificates in +// this order: +// +// 1. Exact match in the in-memory cache +// 2. Wildcard match in the in-memory cache +// 3. Managers (if any) +// 4. Storage (if on-demand is enabled) +// 5. Issuers (if on-demand is enabled) +// +// This method is safe for use as a tls.Config.GetCertificate callback. +// +// GetCertificate will run in a new context. func (m *manager) GetCertificate(hello *tls.ClientHelloInfo) (*tls.Certificate, error) { return m.config.GetCertificate(hello) } -func (m *manager) AcquireCertificates(ctx context.Context, listenAddress string, hostnames []string) error { +// HTTPChallengeResolver starts a http server that responds to HTTP challenge requests and returns +// as soon as the server is running. Use the context to stop the server. +func (m *manager) HTTPChallengeResolver(ctx context.Context, listenAddress string) error { acme := m.config.Issuers[0].(*certmagic.ACMEIssuer) // Start temporary http server on configured port @@ -100,61 +129,187 @@ func (m *manager) AcquireCertificates(ctx context.Context, listenAddress string, MaxHeaderBytes: 1 << 20, } + errorCh := make(chan error, 1) + wg := sync.WaitGroup{} wg.Add(1) - go func() { - tempserver.ListenAndServe() + go func(errorCh chan<- error) { wg.Done() - }() - - var certerr error - - // Get the certificates - logger := m.logger.WithField("hostnames", hostnames) - logger.Info().Log("Acquiring certificate ...") - - err := m.config.ManageSync(ctx, hostnames) - - if err != nil { - certerr = fmt.Errorf("failed to acquire certificate for %s: %w", strings.Join(hostnames, ","), err) - } else { - logger.Info().Log("Successfully acquired certificate") - } - - // Shut down the temporary http server - tempserver.Close() + errorCh <- tempserver.ListenAndServe() + }(errorCh) wg.Wait() - return certerr -} - -func ProxyHTTPChallenge(ctx context.Context, listenAddress string, target *url.URL) error { - proxy := httputil.NewSingleHostReverseProxy(target) - - // Start temporary http server on configured port - tempserver := &http.Server{ - Addr: listenAddress, - Handler: proxy, - ReadTimeout: 10 * time.Second, - WriteTimeout: 10 * time.Second, - MaxHeaderBytes: 1 << 20, + // Wait for an error + select { + case err := <-errorCh: + return err + case <-time.After(3 * time.Second): + break } - wg := sync.WaitGroup{} - wg.Add(1) + go func(ctx context.Context) { + <-ctx.Done() + tempserver.Close() - go func() { - tempserver.ListenAndServe() - wg.Done() - }() + // Drain and close the channel + select { + case <-errorCh: + default: + } - <-ctx.Done() - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - tempserver.Shutdown(ctx) - cancel() + close(errorCh) + }(ctx) return nil } + +// AcquireCertificates tries to acquire the certificates for the given hostnames synchronously. +func (m *manager) AcquireCertificates(ctx context.Context, hostnames []string) error { + m.lock.Lock() + added, removed := diffStringSlice(hostnames, m.hostnames) + m.lock.Unlock() + + var err error + + if len(added) != 0 { + // Get the certificates + m.logger.WithField("hostnames", added).Info().Log("Acquiring certificates ...") + + err = m.config.ManageSync(ctx, added) + if err != nil { + return fmt.Errorf("failed to acquire certificate for %s: %w", strings.Join(added, ","), err) + } + + m.logger.WithField("hostnames", added).Info().Log("Successfully acquired certificate") + } + + if len(removed) != 0 { + m.logger.WithField("hostnames", removed).Info().Log("Unmanage certificates") + m.config.Unmanage(removed) + } + + m.lock.Lock() + m.hostnames = make([]string, len(hostnames)) + copy(m.hostnames, hostnames) + m.lock.Unlock() + + return err +} + +// ManageCertificates is the same as AcquireCertificates but it does it in the background. +func (m *manager) ManageCertificates(ctx context.Context, hostnames []string) error { + m.lock.Lock() + added, removed := diffStringSlice(hostnames, m.hostnames) + m.hostnames = make([]string, len(hostnames)) + copy(m.hostnames, hostnames) + m.lock.Unlock() + + if len(removed) != 0 { + m.logger.WithField("hostnames", removed).Info().Log("Unmanage certificates") + m.config.Unmanage(removed) + } + + if len(added) == 0 { + return nil + } + + m.logger.WithField("hostnames", added).Info().Log("Acquiring certificates") + + return m.config.ManageAsync(ctx, added) +} + +// ManagedNames returns a list of the currently managed domain names. +func (m *manager) ManagedNames() []string { + m.lock.Lock() + defer m.lock.Unlock() + + hostnames := make([]string, len(m.hostnames)) + copy(hostnames, m.hostnames) + + return hostnames +} + +// TLSConfig is an opinionated method that returns a recommended, modern +// TLS configuration that can be used to configure TLS listeners. Aside +// from safe, modern defaults, this method sets one critical field on the +// TLS config which is required to enable automatic certificate +// management: GetCertificate. +// +// The GetCertificate field is necessary to get certificates from memory +// or storage, including both manual and automated certificates. You +// should only change this field if you know what you are doing. +func (m *manager) TLSConfig() *tls.Config { + return &tls.Config{ + GetCertificate: m.GetCertificate, + + // the rest recommended for modern TLS servers + MinVersion: tls.VersionTLS12, + CurvePreferences: []tls.CurveID{ + tls.X25519, + tls.CurveP256, + }, + CipherSuites: preferredDefaultCipherSuites(), + PreferServerCipherSuites: true, + } +} + +// preferredDefaultCipherSuites returns an appropriate +// cipher suite to use depending on hardware support +// for AES-NI. +// +// See https://github.com/mholt/caddy/issues/1674 +// Copied from https://github.com/caddyserver/certmagic/blob/d8e706f9b5011ecbaf20d3c1641e5446ad453613/crypto.go#L299 +func preferredDefaultCipherSuites() []uint16 { + if cpuid.CPU.Supports(cpuid.AESNI) { + return defaultCiphersPreferAES + } + return defaultCiphersPreferChaCha +} + +var ( + defaultCiphersPreferAES = []uint16{ + tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305, + tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305, + } + defaultCiphersPreferChaCha = []uint16{ + tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305, + tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305, + tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, + } +) + +// diffHostnames returns a list of newly added hostnames and a list of removed hostnames based +// the provided list and the list of currently managed hostnames. +func diffStringSlice(next, current []string) ([]string, []string) { + added, removed := []string{}, []string{} + + currentMap := map[string]struct{}{} + + for _, name := range current { + currentMap[name] = struct{}{} + } + + for _, name := range next { + if _, ok := currentMap[name]; ok { + delete(currentMap, name) + continue + } + + added = append(added, name) + } + + for name := range currentMap { + removed = append(removed, name) + } + + return added, removed +} diff --git a/cluster/api.go b/cluster/api.go index cb9df036..e7c7d4ca 100644 --- a/cluster/api.go +++ b/cluster/api.go @@ -21,7 +21,6 @@ import ( "strings" "github.com/datarhei/core/v16/cluster/client" - "github.com/datarhei/core/v16/http/errorhandler" "github.com/datarhei/core/v16/http/handler/util" httplog "github.com/datarhei/core/v16/http/log" mwlog "github.com/datarhei/core/v16/http/middleware/log" @@ -75,7 +74,7 @@ func NewAPI(config APIConfig) (API, error) { a.router = echo.New() a.router.Debug = true - a.router.HTTPErrorHandler = errorhandler.HTTPErrorHandler + a.router.HTTPErrorHandler = ErrorHandler a.router.Validator = validator.New() a.router.HideBanner = true a.router.HidePort = true @@ -96,28 +95,10 @@ func NewAPI(config APIConfig) (API, error) { doc := a.router.Group("/v1/swagger/*") doc.GET("", echoSwagger.EchoWrapHandler(echoSwagger.InstanceName("ClusterAPI"))) - a.router.GET("/", func(c echo.Context) error { - return c.JSON(http.StatusOK, Version.String()) - }) + a.router.GET("/", a.Version) + a.router.GET("/v1/about", a.Version) - a.router.GET("/v1/about", func(c echo.Context) error { - return c.JSON(http.StatusOK, Version.String()) - }) - - a.router.GET("/v1/ready", func(c echo.Context) error { - origin := c.Request().Header.Get("X-Cluster-Origin") - - if origin == a.id { - return Err(http.StatusLoopDetected, "", "breaking circuit") - } - - err := a.cluster.IsReady(origin) - if err != nil { - return Err(http.StatusLocked, "", "not ready yet") - } - - return c.JSON(http.StatusOK, "OK") - }) + a.router.GET("/v1/barrier/:name", a.Barrier) a.router.POST("/v1/server", a.AddServer) a.router.DELETE("/v1/server/:id", a.RemoveServer) @@ -139,6 +120,7 @@ func NewAPI(config APIConfig) (API, error) { a.router.DELETE("/v1/lock/:name", a.Unlock) a.router.POST("/v1/kv", a.SetKV) + a.router.GET("/v1/kv/:key", a.GetKV) a.router.DELETE("/v1/kv/:key", a.UnsetKV) a.router.GET("/v1/core", a.CoreAPIAddress) @@ -157,6 +139,34 @@ func (a *api) Shutdown(ctx context.Context) error { return a.router.Shutdown(ctx) } +// Version returns the version of the cluster +// @Summary The cluster version +// @Description The cluster version +// @Tags v1.0.0 +// @ID cluster-1-version +// @Produce json +// @Success 200 {string} string +// @Success 500 {object} Error +// @Router /v1/version [get] +func (a *api) Version(c echo.Context) error { + return c.JSON(http.StatusOK, Version.String()) +} + +// Barrier returns if the barrier already has been passed +// @Summary Has the barrier already has been passed +// @Description Has the barrier already has been passed +// @Tags v1.0.0 +// @ID cluster-1-barrier +// @Produce json +// @Param name path string true "Barrier name" +// @Success 200 {string} string +// @Success 404 {object} Error +// @Router /v1/barrier/{name} [get] +func (a *api) Barrier(c echo.Context) error { + name := util.PathParam(c, "name") + return c.JSON(http.StatusOK, a.cluster.GetBarrier(name)) +} + // AddServer adds a new server to the cluster // @Summary Add a new server // @Description Add a new server to the cluster @@ -239,7 +249,7 @@ func (a *api) RemoveServer(c echo.Context) error { // @ID cluster-1-snapshot // @Produce application/octet-stream // @Success 200 {file} byte -// @Success 500 {array} Error +// @Success 500 {object} Error // @Router /v1/snapshot [get] func (a *api) Snapshot(c echo.Context) error { origin := c.Request().Header.Get("X-Cluster-Origin") @@ -747,8 +757,8 @@ func (a *api) SetKV(c echo.Context) error { } // UnsetKV removes a key -// @Summary Removes a key -// @Description Removes a key +// @Summary Remove a key +// @Description Remove a key // @Tags v1.0.0 // @ID cluster-1-kv-unset // @Produce json @@ -773,6 +783,7 @@ func (a *api) UnsetKV(c echo.Context) error { err := a.cluster.UnsetKV(origin, key) if err != nil { if err == fs.ErrNotExist { + a.logger.Debug().WithError(err).WithField("key", key).Log("Delete key: not found") return Err(http.StatusNotFound, "", "%s", err.Error()) } a.logger.Debug().WithError(err).WithField("key", key).Log("Unable to remove key") @@ -782,6 +793,48 @@ func (a *api) UnsetKV(c echo.Context) error { return c.JSON(http.StatusOK, "OK") } +// GetKV fetches a key +// @Summary Fetch a key +// @Description Fetch a key +// @Tags v1.0.0 +// @ID cluster-1-kv-get +// @Produce json +// @Param name path string true "Key name" +// @Param X-Cluster-Origin header string false "Origin ID of request" +// @Success 200 {string} string +// @Failure 404 {object} Error +// @Failure 500 {object} Error +// @Failure 508 {object} Error +// @Router /v1/kv/{key} [get] +func (a *api) GetKV(c echo.Context) error { + key := util.PathParam(c, "key") + + origin := c.Request().Header.Get("X-Cluster-Origin") + + if origin == a.id { + return Err(http.StatusLoopDetected, "", "breaking circuit") + } + + a.logger.Debug().WithField("key", key).Log("Get key") + + value, updatedAt, err := a.cluster.GetKV(origin, key) + if err != nil { + if err == fs.ErrNotExist { + a.logger.Debug().WithError(err).WithField("key", key).Log("Get key: not found") + return Err(http.StatusNotFound, "", "%s", err.Error()) + } + a.logger.Debug().WithError(err).WithField("key", key).Log("Unable to retrieve key") + return Err(http.StatusInternalServerError, "", "unable to retrieve key: %s", err.Error()) + } + + res := client.GetKVResponse{ + Value: value, + UpdatedAt: updatedAt, + } + + return c.JSON(http.StatusOK, res) +} + // Error represents an error response of the API type Error struct { Code int `json:"code" jsonschema:"required" format:"int"` @@ -816,3 +869,50 @@ func Err(code int, message string, args ...interface{}) Error { return e } + +// ErrorHandler is a genral handler for echo handler errors +func ErrorHandler(err error, c echo.Context) { + var code int = 0 + var details []string + message := "" + + if he, ok := err.(Error); ok { + code = he.Code + message = he.Message + details = he.Details + } else if he, ok := err.(*echo.HTTPError); ok { + if he.Internal != nil { + if herr, ok := he.Internal.(*echo.HTTPError); ok { + he = herr + } + } + + code = he.Code + message = http.StatusText(he.Code) + if len(message) == 0 { + switch code { + case 509: + message = "Bandwith limit exceeded" + default: + } + } + details = strings.Split(fmt.Sprintf("%v", he.Message), "\n") + } else { + code = http.StatusInternalServerError + message = http.StatusText(http.StatusInternalServerError) + details = strings.Split(fmt.Sprintf("%s", err), "\n") + } + + // Send response + if !c.Response().Committed { + if c.Request().Method == http.MethodHead { + c.NoContent(code) + } else { + c.JSON(code, Error{ + Code: code, + Message: message, + Details: details, + }) + } + } +} diff --git a/cluster/client/client.go b/cluster/client/client.go index c42c648e..b2f9bffe 100644 --- a/cluster/client/client.go +++ b/cluster/client/client.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "io" + "io/fs" "net/http" "net/url" "time" @@ -59,6 +60,11 @@ type SetKVRequest struct { Value string `json:"value"` } +type GetKVResponse struct { + Value string `json:"value"` + UpdatedAt time.Time `json:"updated_at"` +} + type APIClient struct { Address string Client *http.Client @@ -79,6 +85,21 @@ func (c *APIClient) Version() (string, error) { return version, nil } +func (c *APIClient) Barrier(name string) (bool, error) { + data, err := c.call(http.MethodGet, "/v1/barrier/"+url.PathEscape(name), "application/json", nil, "") + if err != nil { + return false, err + } + + var passed bool + err = json.Unmarshal(data, &passed) + if err != nil { + return false, err + } + + return passed, nil +} + func (c *APIClient) CoreAPIAddress() (string, error) { data, err := c.call(http.MethodGet, "/v1/core", "", nil, "") if err != nil { @@ -245,20 +266,40 @@ func (c *APIClient) SetKV(origin string, r SetKVRequest) error { func (c *APIClient) UnsetKV(origin string, key string) error { _, err := c.call(http.MethodDelete, "/v1/kv/"+url.PathEscape(key), "application/json", nil, origin) + if err != nil { + e, ok := err.(httpapi.Error) + if ok && e.Code == 404 { + return fs.ErrNotExist + } + } return err } +func (c *APIClient) GetKV(origin string, key string) (string, time.Time, error) { + data, err := c.call(http.MethodGet, "/v1/kv/"+url.PathEscape(key), "application/json", nil, origin) + if err != nil { + e, ok := err.(httpapi.Error) + if ok && e.Code == 404 { + return "", time.Time{}, fs.ErrNotExist + } + + return "", time.Time{}, err + } + + res := GetKVResponse{} + err = json.Unmarshal(data, &res) + if err != nil { + return "", time.Time{}, err + } + + return res.Value, res.UpdatedAt, nil +} + func (c *APIClient) Snapshot(origin string) (io.ReadCloser, error) { return c.stream(http.MethodGet, "/v1/snapshot", "", nil, origin) } -func (c *APIClient) IsReady(origin string) error { - _, err := c.call(http.MethodGet, "/v1/ready", "application/json", nil, origin) - - return err -} - func (c *APIClient) stream(method, path, contentType string, data io.Reader, origin string) (io.ReadCloser, error) { if len(c.Address) == 0 { return nil, fmt.Errorf("no address defined") diff --git a/cluster/cluster.go b/cluster/cluster.go index 2b612e46..ae3f3bef 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -44,7 +44,9 @@ type Cluster interface { CoreConfig() *config.Config About() (ClusterAbout, error) - IsReady(origin string) error + IsClusterDegraded() (bool, error) + IsDegraded() (bool, error) + GetBarrier(name string) bool Join(origin, id, raftAddress, peerAddress string) error Leave(origin, id string) error // gracefully remove a node from the cluster @@ -76,7 +78,7 @@ type Cluster interface { SetKV(origin, key, value string) error UnsetKV(origin, key string) error - GetKV(key string) (string, time.Time, error) + GetKV(origin, key string) (string, time.Time, error) ListKV(prefix string) map[string]store.Value ProxyReader() proxy.ProxyReader @@ -155,7 +157,8 @@ type cluster struct { nodes map[string]*clusterNode nodesLock sync.RWMutex - ready bool + barrier map[string]bool + barrierLock sync.RWMutex limiter net.IPLimiter } @@ -187,6 +190,8 @@ func New(ctx context.Context, config Config) (Cluster, error) { config: config.CoreConfig, nodes: map[string]*clusterNode{}, + barrier: map[string]bool{}, + limiter: config.IPLimiter, } @@ -352,6 +357,16 @@ func New(ctx context.Context, config Config) (Cluster, error) { go c.monitorLeadership() go c.sentinel() + err = c.setup(ctx) + if err != nil { + c.Shutdown() + return nil, fmt.Errorf("failed to setup cluster: %w", err) + } + + return c, nil +} + +func (c *cluster) setup(ctx context.Context) error { // Wait for a leader to be selected c.logger.Info().Log("Waiting for a leader to be elected ...") @@ -364,8 +379,7 @@ func New(ctx context.Context, config Config) (Cluster, error) { select { case <-ctx.Done(): - c.Shutdown() - return nil, fmt.Errorf("starting cluster has been aborted: %w", ctx.Err()) + return fmt.Errorf("starting cluster has been aborted: %w", ctx.Err()) default: } @@ -374,7 +388,7 @@ func New(ctx context.Context, config Config) (Cluster, error) { c.logger.Info().Log("Leader has been elected") - // Wait for cluster to leave degraded mode + // Wait for all cluster nodes to leave degraded mode c.logger.Info().Log("Waiting for cluster to become operational ...") @@ -384,125 +398,128 @@ func New(ctx context.Context, config Config) (Cluster, error) { break } - c.logger.Warn().WithError(err).Log("Cluster is degraded") + c.logger.Warn().WithError(err).Log("Cluster is in degraded state") select { case <-ctx.Done(): - c.Shutdown() - return nil, fmt.Errorf("starting cluster has been aborted: %w: %w", ctx.Err(), err) + return fmt.Errorf("starting cluster has been aborted: %w: %w", ctx.Err(), err) default: } time.Sleep(time.Second) } + err := c.Barrier(ctx, "operational") + if err != nil { + return fmt.Errorf("failed on barrier: %w", err) + } + c.logger.Info().Log("Cluster is operational") if c.isTLSRequired { + c.logger.Info().Log("Waiting for TLS certificates ...") + // Create certificate manager - names, err := c.getClusterHostnames() + hostnames, err := c.getClusterHostnames() if err != nil { - c.Shutdown() - return nil, fmt.Errorf("tls: failed to assemble list of all configured hostnames: %w", err) + return fmt.Errorf("tls: failed to assemble list of all configured hostnames: %w", err) } - if len(names) == 0 { - c.Shutdown() - return nil, fmt.Errorf("tls: no hostnames are configured") + if len(hostnames) == 0 { + return fmt.Errorf("no hostnames are configured") } kvs, err := NewClusterKVS(c, c.logger.WithComponent("KVS")) if err != nil { - c.Shutdown() - return nil, fmt.Errorf("tls: cluster KVS: %w", err) + return fmt.Errorf("tls: cluster KVS: %w", err) } - storage, err := NewClusterStorage(kvs, "core-cluster-certificates") + storage, err := NewClusterStorage(kvs, "core-cluster-certificates", c.logger.WithComponent("KVS")) if err != nil { - c.Shutdown() - return nil, fmt.Errorf("tls: certificate store: %w", err) + return fmt.Errorf("tls: certificate store: %w", err) } manager, err := autocert.New(autocert.Config{ Storage: storage, - DefaultHostname: names[0], + DefaultHostname: hostnames[0], EmailAddress: c.config.TLS.Email, IsProduction: !c.config.TLS.Staging, Logger: c.logger.WithComponent("Let's Encrypt"), }) if err != nil { - c.Shutdown() - return nil, fmt.Errorf("tls: certificate manager: %w", err) + return fmt.Errorf("tls: certificate manager: %w", err) } c.certManager = manager - if c.IsRaftLeader() { - // Acquire certificates - err = manager.AcquireCertificates(ctx, c.config.Address, names) - if err != nil { - c.Shutdown() - return nil, fmt.Errorf("tls: failed to acquire certificates: %w", err) - } + resctx, rescancel := context.WithCancel(ctx) + defer rescancel() + + err = manager.HTTPChallengeResolver(resctx, c.config.Address) + if err != nil { + return fmt.Errorf("tls: failed to start the HTTP challenge resolver: %w", err) } + + // We have to wait for all nodes to have the HTTP challenge resolver started + err = c.Barrier(ctx, "acme") + if err != nil { + return fmt.Errorf("tls: failed on barrier: %w", err) + } + + // Acquire certificates, all nodes can do this at the same time because everything + // is synched via the storage. + err = manager.AcquireCertificates(ctx, hostnames) + if err != nil { + return fmt.Errorf("tls: failed to acquire certificates: %w", err) + } + + rescancel() + + c.logger.Info().Log("TLS certificates acquired") } - if !c.IsRaftLeader() { - tempctx, cancel := context.WithCancel(context.Background()) + c.logger.Info().Log("Waiting for cluster to become ready ...") - if c.isTLSRequired { - // All followers forward any HTTP requests to the leader such that it can respond to the HTTP challenge - leaderAddress, _ := c.raft.Leader() - leader, err := c.CoreAPIAddress(leaderAddress) - if err != nil { - cancel() - c.Shutdown() - return nil, fmt.Errorf("unable to find leader address: %w", err) - } - - url, err := url.Parse(leader) - if err != nil { - cancel() - return nil, fmt.Errorf("unable to parse leader address: %w", err) - } - - url.Scheme = "http" - url.Path = "/" - url.User = nil - url.RawQuery = "" - - go func() { - c.logger.Info().WithField("leader", url.String()).Log("Forwarding ACME challenges to leader") - autocert.ProxyHTTPChallenge(tempctx, c.config.Address, url) - c.logger.Info().WithField("leader", url.String()).Log("Stopped forwarding ACME challenges to leader") - }() - } - - for { - // Ask leader if it is ready - err := c.IsReady("") - if err == nil { - cancel() - break - } - - select { - case <-ctx.Done(): - cancel() - c.Shutdown() - return nil, fmt.Errorf("starting cluster has been aborted: %w", ctx.Err()) - default: - } - - time.Sleep(time.Second) - } + err = c.Barrier(ctx, "ready") + if err != nil { + return fmt.Errorf("failed on barrier: %w", err) } - c.ready = true - c.logger.Info().Log("Cluster is ready") - return c, nil + return nil +} + +func (c *cluster) GetBarrier(name string) bool { + c.barrierLock.RLock() + defer c.barrierLock.RUnlock() + + return c.barrier[name] +} + +func (c *cluster) Barrier(ctx context.Context, name string) error { + c.barrierLock.Lock() + c.barrier[name] = true + c.barrierLock.Unlock() + + for { + ok, err := c.getClusterBarrier(name) + if ok { + break + } + + c.logger.Warn().WithField("name", name).WithError(err).Log("Waiting for barrier") + + select { + case <-ctx.Done(): + return fmt.Errorf("barrier %s: starting cluster has been aborted: %w: %w", name, ctx.Err(), err) + default: + } + + time.Sleep(time.Second) + } + + return nil } func (c *cluster) Address() string { @@ -924,7 +941,7 @@ func (c *cluster) trackNodeChanges() { c.nodesLock.Unlock() // Put the cluster in "degraded" mode in case there's a mismatch in expected values - err = c.checkClusterNodes() + _, err = c.checkClusterNodes() c.stateLock.Lock() if err != nil { @@ -948,36 +965,62 @@ func (c *cluster) trackNodeChanges() { c.isCoreDegradedErr = nil } c.stateLock.Unlock() + /* + if c.isTLSRequired { + // Update list of managed hostnames + if c.certManager != nil { + c.certManager.ManageCertificates(context.Background(), hostnames) + } + + } + */ case <-c.shutdownCh: return } } } -func (c *cluster) checkClusterNodes() error { +// checkClusterNodes returns a list of all hostnames configured on all nodes. The +// returned list will not contain any duplicates. An error is returned in case the +// node is not compatible. +func (c *cluster) checkClusterNodes() ([]string, error) { + hostnames := map[string]struct{}{} + c.nodesLock.RLock() defer c.nodesLock.RUnlock() for id, node := range c.nodes { if status, err := node.Status(); status == "offline" { - return fmt.Errorf("node %s is offline: %w", id, err) + return nil, fmt.Errorf("node %s is offline: %w", id, err) } version := node.Version() if err := verifyClusterVersion(version); err != nil { - return fmt.Errorf("node %s has a different version: %s: %w", id, version, err) + return nil, fmt.Errorf("node %s has a different version: %s: %w", id, version, err) } config, err := node.CoreConfig() if err != nil { - return fmt.Errorf("node %s has no configuration available: %w", id, err) + return nil, fmt.Errorf("node %s has no configuration available: %w", id, err) } if err := verifyClusterConfig(c.config, config); err != nil { - return fmt.Errorf("node %s has a different configuration: %w", id, err) + return nil, fmt.Errorf("node %s has a different configuration: %w", id, err) + } + + for _, name := range config.Host.Name { + hostnames[name] = struct{}{} } } - return nil + names := []string{} + + for key := range hostnames { + names = append(names, key) + } + + sort.Strings(names) + + return names, nil } func (c *cluster) checkClusterCoreNodes() error { @@ -993,6 +1036,8 @@ func (c *cluster) checkClusterCoreNodes() error { return nil } +// getClusterHostnames return a list of all hostnames configured on all nodes. The +// returned list will not contain any duplicates. func (c *cluster) getClusterHostnames() ([]string, error) { hostnames := map[string]struct{}{} @@ -1021,6 +1066,21 @@ func (c *cluster) getClusterHostnames() ([]string, error) { return names, nil } +// getClusterBarrier returns whether all nodes are currently on the same barrier. +func (c *cluster) getClusterBarrier(name string) (bool, error) { + c.nodesLock.RLock() + defer c.nodesLock.RUnlock() + + for _, node := range c.nodes { + ok, err := node.Barrier(name) + if !ok { + return false, err + } + } + + return true, nil +} + func verifyClusterVersion(v string) error { version, err := ParseClusterVersion(v) if err != nil { @@ -1500,7 +1560,15 @@ func (c *cluster) UnsetKV(origin, key string) error { return c.applyCommand(cmd) } -func (c *cluster) GetKV(key string) (string, time.Time, error) { +func (c *cluster) GetKV(origin, key string) (string, time.Time, error) { + if ok, _ := c.IsClusterDegraded(); ok { + return "", time.Time{}, ErrDegraded + } + + if !c.IsRaftLeader() { + return c.forwarder.GetKV(origin, key) + } + value, err := c.store.GetFromKVS(key) if err != nil { return "", time.Time{}, err @@ -1515,22 +1583,6 @@ func (c *cluster) ListKV(prefix string) map[string]store.Value { return storeValues } -func (c *cluster) IsReady(origin string) error { - if ok, _ := c.IsClusterDegraded(); ok { - return ErrDegraded - } - - if !c.IsRaftLeader() { - return c.forwarder.IsReady(origin) - } - - if !c.ready { - return fmt.Errorf("no ready yet") - } - - return nil -} - func (c *cluster) applyCommand(cmd *store.Command) error { b, err := json.Marshal(cmd) if err != nil { diff --git a/cluster/docs/ClusterAPI_docs.go b/cluster/docs/ClusterAPI_docs.go index 7a2121cc..ba957dfd 100644 --- a/cluster/docs/ClusterAPI_docs.go +++ b/cluster/docs/ClusterAPI_docs.go @@ -24,6 +24,42 @@ const docTemplateClusterAPI = `{ "host": "{{.Host}}", "basePath": "{{.BasePath}}", "paths": { + "/v1/barrier/{name}": { + "get": { + "description": "Has the barrier already has been passed", + "produces": [ + "application/json" + ], + "tags": [ + "v1.0.0" + ], + "summary": "Has the barrier already has been passed", + "operationId": "cluster-1-barrier", + "parameters": [ + { + "type": "string", + "description": "Barrier name", + "name": "name", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "string" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/cluster.Error" + } + } + } + } + }, "/v1/core": { "get": { "description": "Core API address and login of this node", @@ -333,15 +369,67 @@ const docTemplateClusterAPI = `{ } }, "/v1/kv/{key}": { - "delete": { - "description": "Removes a key", + "get": { + "description": "Fetch a key", "produces": [ "application/json" ], "tags": [ "v1.0.0" ], - "summary": "Removes a key", + "summary": "Fetch a key", + "operationId": "cluster-1-kv-get", + "parameters": [ + { + "type": "string", + "description": "Key name", + "name": "name", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "Origin ID of request", + "name": "X-Cluster-Origin", + "in": "header" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "string" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/cluster.Error" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/cluster.Error" + } + }, + "508": { + "description": "Loop Detected", + "schema": { + "$ref": "#/definitions/cluster.Error" + } + } + } + }, + "delete": { + "description": "Remove a key", + "produces": [ + "application/json" + ], + "tags": [ + "v1.0.0" + ], + "summary": "Remove a key", "operationId": "cluster-1-kv-unset", "parameters": [ { @@ -910,10 +998,34 @@ const docTemplateClusterAPI = `{ "500": { "description": "Internal Server Error", "schema": { - "type": "array", - "items": { - "$ref": "#/definitions/cluster.Error" - } + "$ref": "#/definitions/cluster.Error" + } + } + } + } + }, + "/v1/version": { + "get": { + "description": "The cluster version", + "produces": [ + "application/json" + ], + "tags": [ + "v1.0.0" + ], + "summary": "The cluster version", + "operationId": "cluster-1-version", + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "string" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/cluster.Error" } } } diff --git a/cluster/docs/ClusterAPI_swagger.json b/cluster/docs/ClusterAPI_swagger.json index 042d9af8..d59e4202 100644 --- a/cluster/docs/ClusterAPI_swagger.json +++ b/cluster/docs/ClusterAPI_swagger.json @@ -16,6 +16,42 @@ }, "basePath": "/", "paths": { + "/v1/barrier/{name}": { + "get": { + "description": "Has the barrier already has been passed", + "produces": [ + "application/json" + ], + "tags": [ + "v1.0.0" + ], + "summary": "Has the barrier already has been passed", + "operationId": "cluster-1-barrier", + "parameters": [ + { + "type": "string", + "description": "Barrier name", + "name": "name", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "string" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/cluster.Error" + } + } + } + } + }, "/v1/core": { "get": { "description": "Core API address and login of this node", @@ -325,15 +361,67 @@ } }, "/v1/kv/{key}": { - "delete": { - "description": "Removes a key", + "get": { + "description": "Fetch a key", "produces": [ "application/json" ], "tags": [ "v1.0.0" ], - "summary": "Removes a key", + "summary": "Fetch a key", + "operationId": "cluster-1-kv-get", + "parameters": [ + { + "type": "string", + "description": "Key name", + "name": "name", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "Origin ID of request", + "name": "X-Cluster-Origin", + "in": "header" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "string" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/cluster.Error" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/cluster.Error" + } + }, + "508": { + "description": "Loop Detected", + "schema": { + "$ref": "#/definitions/cluster.Error" + } + } + } + }, + "delete": { + "description": "Remove a key", + "produces": [ + "application/json" + ], + "tags": [ + "v1.0.0" + ], + "summary": "Remove a key", "operationId": "cluster-1-kv-unset", "parameters": [ { @@ -902,10 +990,34 @@ "500": { "description": "Internal Server Error", "schema": { - "type": "array", - "items": { - "$ref": "#/definitions/cluster.Error" - } + "$ref": "#/definitions/cluster.Error" + } + } + } + } + }, + "/v1/version": { + "get": { + "description": "The cluster version", + "produces": [ + "application/json" + ], + "tags": [ + "v1.0.0" + ], + "summary": "The cluster version", + "operationId": "cluster-1-version", + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "string" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/cluster.Error" } } } diff --git a/cluster/docs/ClusterAPI_swagger.yaml b/cluster/docs/ClusterAPI_swagger.yaml index 2a39a7fb..726e5d88 100644 --- a/cluster/docs/ClusterAPI_swagger.yaml +++ b/cluster/docs/ClusterAPI_swagger.yaml @@ -682,6 +682,30 @@ info: title: datarhei Core Cluster API version: "1.0" paths: + /v1/barrier/{name}: + get: + description: Has the barrier already has been passed + operationId: cluster-1-barrier + parameters: + - description: Barrier name + in: path + name: name + required: true + type: string + produces: + - application/json + responses: + "200": + description: OK + schema: + type: string + "404": + description: Not Found + schema: + $ref: '#/definitions/cluster.Error' + summary: Has the barrier already has been passed + tags: + - v1.0.0 /v1/core: get: description: Core API address and login of this node @@ -890,7 +914,7 @@ paths: - v1.0.0 /v1/kv/{key}: delete: - description: Removes a key + description: Remove a key operationId: cluster-1-kv-unset parameters: - description: Key name @@ -921,7 +945,42 @@ paths: description: Loop Detected schema: $ref: '#/definitions/cluster.Error' - summary: Removes a key + summary: Remove a key + tags: + - v1.0.0 + get: + description: Fetch a key + operationId: cluster-1-kv-get + parameters: + - description: Key name + in: path + name: name + required: true + type: string + - description: Origin ID of request + in: header + name: X-Cluster-Origin + type: string + produces: + - application/json + responses: + "200": + description: OK + schema: + type: string + "404": + description: Not Found + schema: + $ref: '#/definitions/cluster.Error' + "500": + description: Internal Server Error + schema: + $ref: '#/definitions/cluster.Error' + "508": + description: Loop Detected + schema: + $ref: '#/definitions/cluster.Error' + summary: Fetch a key tags: - v1.0.0 /v1/lock: @@ -1276,10 +1335,26 @@ paths: "500": description: Internal Server Error schema: - items: - $ref: '#/definitions/cluster.Error' - type: array + $ref: '#/definitions/cluster.Error' summary: Cluster DB snapshot tags: - v1.0.0 + /v1/version: + get: + description: The cluster version + operationId: cluster-1-version + produces: + - application/json + responses: + "200": + description: OK + schema: + type: string + "500": + description: Internal Server Error + schema: + $ref: '#/definitions/cluster.Error' + summary: The cluster version + tags: + - v1.0.0 swagger: "2.0" diff --git a/cluster/forwarder/forwarder.go b/cluster/forwarder/forwarder.go index 9d12d92c..a2be10ab 100644 --- a/cluster/forwarder/forwarder.go +++ b/cluster/forwarder/forwarder.go @@ -38,8 +38,7 @@ type Forwarder interface { SetKV(origin, key, value string) error UnsetKV(origin, key string) error - - IsReady(origin string) error + GetKV(origin, key string) (string, time.Time, error) } type forwarder struct { @@ -342,7 +341,7 @@ func (f *forwarder) UnsetKV(origin, key string) error { return client.UnsetKV(origin, key) } -func (f *forwarder) IsReady(origin string) error { +func (f *forwarder) GetKV(origin, key string) (string, time.Time, error) { if origin == "" { origin = f.id } @@ -351,5 +350,5 @@ func (f *forwarder) IsReady(origin string) error { client := f.client f.lock.RUnlock() - return client.IsReady(origin) + return client.GetKV(origin, key) } diff --git a/cluster/kvs.go b/cluster/kvs.go index 12b499a3..22f03ce0 100644 --- a/cluster/kvs.go +++ b/cluster/kvs.go @@ -93,6 +93,7 @@ func (s *clusterKVS) DeleteLock(name string) error { } func (s *clusterKVS) ListLocks() map[string]time.Time { + s.logger.Debug().Log("List locks") return s.cluster.ListLocks() } @@ -111,10 +112,11 @@ func (s *clusterKVS) UnsetKV(key string) error { func (s *clusterKVS) GetKV(key string) (string, time.Time, error) { s.logger.Debug().WithField("key", key).Log("Get KV") - return s.cluster.GetKV(key) + return s.cluster.GetKV("", key) } func (s *clusterKVS) ListKV(prefix string) map[string]store.Value { + s.logger.Debug().Log("List KV") return s.cluster.ListKV(prefix) } diff --git a/cluster/node.go b/cluster/node.go index 712268da..bb3a81a4 100644 --- a/cluster/node.go +++ b/cluster/node.go @@ -193,6 +193,10 @@ func (n *clusterNode) CoreAPIAddress() (string, error) { return n.client.CoreAPIAddress() } +func (n *clusterNode) Barrier(name string) (bool, error) { + return n.client.Barrier(name) +} + func (n *clusterNode) Proxy() proxy.Node { return n.proxyNode } diff --git a/cluster/tls.go b/cluster/tls.go index d1f42143..a8649c62 100644 --- a/cluster/tls.go +++ b/cluster/tls.go @@ -11,6 +11,7 @@ import ( "time" "github.com/caddyserver/certmagic" + "github.com/datarhei/core/v16/log" ) type clusterStorage struct { @@ -18,13 +19,20 @@ type clusterStorage struct { prefix string locks map[string]*Lock muLocks sync.Mutex + + logger log.Logger } -func NewClusterStorage(kvs KVS, prefix string) (certmagic.Storage, error) { +func NewClusterStorage(kvs KVS, prefix string, logger log.Logger) (certmagic.Storage, error) { s := &clusterStorage{ kvs: kvs, prefix: prefix, locks: map[string]*Lock{}, + logger: logger, + } + + if s.logger == nil { + s.logger = log.New("") } return s, nil @@ -39,6 +47,7 @@ func (s *clusterStorage) unprefixKey(key string) string { } func (s *clusterStorage) Lock(ctx context.Context, name string) error { + s.logger.Debug().WithField("name", name).Log("StorageLock") for { lock, err := s.kvs.CreateLock(s.prefixKey(name), time.Now().Add(time.Minute)) if err == nil { @@ -65,13 +74,18 @@ func (s *clusterStorage) Lock(ctx context.Context, name string) error { } func (s *clusterStorage) Unlock(ctx context.Context, name string) error { + s.logger.Debug().WithField("name", name).Log("StorageUnlock") err := s.kvs.DeleteLock(s.prefixKey(name)) if err != nil { return err } s.muLocks.Lock() - delete(s.locks, name) + lock, ok := s.locks[name] + if ok { + lock.Unlock() + delete(s.locks, name) + } s.muLocks.Unlock() return nil @@ -79,14 +93,17 @@ func (s *clusterStorage) Unlock(ctx context.Context, name string) error { // Store puts value at key. func (s *clusterStorage) Store(ctx context.Context, key string, value []byte) error { + s.logger.Debug().WithField("key", key).Log("StorageStore") encodedValue := base64.StdEncoding.EncodeToString(value) return s.kvs.SetKV(s.prefixKey(key), encodedValue) } // Load retrieves the value at key. func (s *clusterStorage) Load(ctx context.Context, key string) ([]byte, error) { + s.logger.Debug().WithField("key", key).Log("StorageLoad") encodedValue, _, err := s.kvs.GetKV(s.prefixKey(key)) if err != nil { + s.logger.Debug().WithError(err).WithField("key", key).Log("StorageLoad") return nil, err } @@ -97,12 +114,14 @@ func (s *clusterStorage) Load(ctx context.Context, key string) ([]byte, error) { // returned only if the key still exists // when the method returns. func (s *clusterStorage) Delete(ctx context.Context, key string) error { + s.logger.Debug().WithField("key", key).Log("StorageDelete") return s.kvs.UnsetKV(s.prefixKey(key)) } // Exists returns true if the key exists // and there was no error checking. func (s *clusterStorage) Exists(ctx context.Context, key string) bool { + s.logger.Debug().WithField("key", key).Log("StorageExits") _, _, err := s.kvs.GetKV(s.prefixKey(key)) return err == nil } @@ -113,6 +132,7 @@ func (s *clusterStorage) Exists(ctx context.Context, key string) bool { // should be walked); otherwise, only keys // prefixed exactly by prefix will be listed. func (s *clusterStorage) List(ctx context.Context, prefix string, recursive bool) ([]string, error) { + s.logger.Debug().WithField("prefix", prefix).Log("StorageList") values := s.kvs.ListKV(s.prefixKey(prefix)) keys := []string{} @@ -147,6 +167,7 @@ func (s *clusterStorage) List(ctx context.Context, prefix string, recursive bool // Stat returns information about key. func (s *clusterStorage) Stat(ctx context.Context, key string) (certmagic.KeyInfo, error) { + s.logger.Debug().WithField("key", key).Log("StorageStat") encodedValue, lastModified, err := s.kvs.GetKV(s.prefixKey(key)) if err != nil { return certmagic.KeyInfo{}, err diff --git a/cluster/tls_test.go b/cluster/tls_test.go index 78837946..625d0cbf 100644 --- a/cluster/tls_test.go +++ b/cluster/tls_test.go @@ -17,7 +17,7 @@ func setupStorage() (certmagic.Storage, error) { return nil, err } - return NewClusterStorage(kvs, "some_prefix") + return NewClusterStorage(kvs, "some_prefix", nil) } func TestStorageStore(t *testing.T) {