Allow to acquire certificates in multi-node cluster

This commit is contained in:
Ingo Oppermann
2023-06-29 21:15:04 +02:00
parent 2b58c11bb1
commit 6e156d0f3a
13 changed files with 899 additions and 224 deletions

View File

@@ -2,7 +2,6 @@ package api
import ( import (
"context" "context"
"crypto/tls"
"fmt" "fmt"
"io" "io"
golog "log" golog "log"
@@ -492,9 +491,16 @@ func (a *api) start(ctx context.Context) error {
return fmt.Errorf("failed to initialize autocert manager: %w", err) return fmt.Errorf("failed to initialize autocert manager: %w", err)
} }
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute) resctx, rescancel := context.WithCancel(ctx)
err = manager.AcquireCertificates(ctx, cfg.Address, cfg.Host.Name) err = manager.HTTPChallengeResolver(resctx, cfg.Address)
cancel()
if err == nil {
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
err = manager.AcquireCertificates(ctx, cfg.Host.Name)
cancel()
}
rescancel()
autocertManager = manager autocertManager = manager
@@ -1266,9 +1272,7 @@ func (a *api) start(ctx context.Context) error {
a.log.logger.rtmps = a.log.logger.core.WithComponent("RTMPS").WithField("address", cfg.RTMP.AddressTLS) a.log.logger.rtmps = a.log.logger.core.WithComponent("RTMPS").WithField("address", cfg.RTMP.AddressTLS)
if autocertManager != nil { if autocertManager != nil {
config.TLSConfig = &tls.Config{ config.TLSConfig = autocertManager.TLSConfig()
GetCertificate: autocertManager.GetCertificate,
}
} }
} }
@@ -1476,9 +1480,7 @@ func (a *api) start(ctx context.Context) error {
} }
if cfg.TLS.Auto { if cfg.TLS.Auto {
a.mainserver.TLSConfig = &tls.Config{ a.mainserver.TLSConfig = autocertManager.TLSConfig()
GetCertificate: autocertManager.GetCertificate,
}
a.sidecarserver.Handler = autocertManager.HTTPChallengeHandler(sidecarserverhandler) a.sidecarserver.Handler = autocertManager.HTTPChallengeHandler(sidecarserverhandler)
} }

View File

@@ -5,8 +5,6 @@ import (
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"net/http" "net/http"
"net/http/httputil"
"net/url"
"strings" "strings"
"sync" "sync"
"time" "time"
@@ -14,13 +12,18 @@ import (
"github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/log"
"github.com/caddyserver/certmagic" "github.com/caddyserver/certmagic"
"github.com/klauspost/cpuid/v2"
"go.uber.org/zap" "go.uber.org/zap"
) )
type Manager interface { type Manager interface {
AcquireCertificates(ctx context.Context, listenAddress string, hostnames []string) error AcquireCertificates(ctx context.Context, hostnames []string) error
GetCertificate(*tls.ClientHelloInfo) (*tls.Certificate, error) ManageCertificates(ctx context.Context, hostnames []string) error
HTTPChallengeResolver(ctx context.Context, listenAddress string) error
HTTPChallengeHandler(h http.Handler) http.Handler HTTPChallengeHandler(h http.Handler) http.Handler
GetCertificate(*tls.ClientHelloInfo) (*tls.Certificate, error)
TLSConfig() *tls.Config
ManagedNames() []string
} }
type Config struct { type Config struct {
@@ -34,12 +37,16 @@ type Config struct {
type manager struct { type manager struct {
config *certmagic.Config config *certmagic.Config
hostnames []string
lock sync.Mutex
logger log.Logger logger log.Logger
} }
func New(config Config) (Manager, error) { func New(config Config) (Manager, error) {
m := &manager{ m := &manager{
logger: config.Logger, hostnames: []string{},
logger: config.Logger,
} }
if m.logger == nil { if m.logger == nil {
@@ -77,16 +84,38 @@ func New(config Config) (Manager, error) {
return m, nil return m, nil
} }
// HTTPChallengeHandler wraps h in a handler that can solve the ACME
// HTTP challenge. cfg is required, and it must have a certificate
// cache backed by a functional storage facility, since that is where
// the challenge state is stored between initiation and solution.
//
// If a request is not an ACME HTTP challenge, h will be invoked.
func (m *manager) HTTPChallengeHandler(h http.Handler) http.Handler { func (m *manager) HTTPChallengeHandler(h http.Handler) http.Handler {
acme := m.config.Issuers[0].(*certmagic.ACMEIssuer) acme := m.config.Issuers[0].(*certmagic.ACMEIssuer)
return acme.HTTPChallengeHandler(h) return acme.HTTPChallengeHandler(h)
} }
// GetCertificate gets a certificate to satisfy clientHello. In getting
// the certificate, it abides the rules and settings defined in the Config
// that matches clientHello.ServerName. It tries to get certificates in
// this order:
//
// 1. Exact match in the in-memory cache
// 2. Wildcard match in the in-memory cache
// 3. Managers (if any)
// 4. Storage (if on-demand is enabled)
// 5. Issuers (if on-demand is enabled)
//
// This method is safe for use as a tls.Config.GetCertificate callback.
//
// GetCertificate will run in a new context.
func (m *manager) GetCertificate(hello *tls.ClientHelloInfo) (*tls.Certificate, error) { func (m *manager) GetCertificate(hello *tls.ClientHelloInfo) (*tls.Certificate, error) {
return m.config.GetCertificate(hello) return m.config.GetCertificate(hello)
} }
func (m *manager) AcquireCertificates(ctx context.Context, listenAddress string, hostnames []string) error { // HTTPChallengeResolver starts a http server that responds to HTTP challenge requests and returns
// as soon as the server is running. Use the context to stop the server.
func (m *manager) HTTPChallengeResolver(ctx context.Context, listenAddress string) error {
acme := m.config.Issuers[0].(*certmagic.ACMEIssuer) acme := m.config.Issuers[0].(*certmagic.ACMEIssuer)
// Start temporary http server on configured port // Start temporary http server on configured port
@@ -100,61 +129,187 @@ func (m *manager) AcquireCertificates(ctx context.Context, listenAddress string,
MaxHeaderBytes: 1 << 20, MaxHeaderBytes: 1 << 20,
} }
errorCh := make(chan error, 1)
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(1) wg.Add(1)
go func() { go func(errorCh chan<- error) {
tempserver.ListenAndServe()
wg.Done() wg.Done()
}() errorCh <- tempserver.ListenAndServe()
}(errorCh)
var certerr error
// Get the certificates
logger := m.logger.WithField("hostnames", hostnames)
logger.Info().Log("Acquiring certificate ...")
err := m.config.ManageSync(ctx, hostnames)
if err != nil {
certerr = fmt.Errorf("failed to acquire certificate for %s: %w", strings.Join(hostnames, ","), err)
} else {
logger.Info().Log("Successfully acquired certificate")
}
// Shut down the temporary http server
tempserver.Close()
wg.Wait() wg.Wait()
return certerr // Wait for an error
} select {
case err := <-errorCh:
func ProxyHTTPChallenge(ctx context.Context, listenAddress string, target *url.URL) error { return err
proxy := httputil.NewSingleHostReverseProxy(target) case <-time.After(3 * time.Second):
break
// Start temporary http server on configured port
tempserver := &http.Server{
Addr: listenAddress,
Handler: proxy,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
MaxHeaderBytes: 1 << 20,
} }
wg := sync.WaitGroup{} go func(ctx context.Context) {
wg.Add(1) <-ctx.Done()
tempserver.Close()
go func() { // Drain and close the channel
tempserver.ListenAndServe() select {
wg.Done() case <-errorCh:
}() default:
}
<-ctx.Done() close(errorCh)
}(ctx)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
tempserver.Shutdown(ctx)
cancel()
return nil return nil
} }
// AcquireCertificates tries to acquire the certificates for the given hostnames synchronously.
func (m *manager) AcquireCertificates(ctx context.Context, hostnames []string) error {
m.lock.Lock()
added, removed := diffStringSlice(hostnames, m.hostnames)
m.lock.Unlock()
var err error
if len(added) != 0 {
// Get the certificates
m.logger.WithField("hostnames", added).Info().Log("Acquiring certificates ...")
err = m.config.ManageSync(ctx, added)
if err != nil {
return fmt.Errorf("failed to acquire certificate for %s: %w", strings.Join(added, ","), err)
}
m.logger.WithField("hostnames", added).Info().Log("Successfully acquired certificate")
}
if len(removed) != 0 {
m.logger.WithField("hostnames", removed).Info().Log("Unmanage certificates")
m.config.Unmanage(removed)
}
m.lock.Lock()
m.hostnames = make([]string, len(hostnames))
copy(m.hostnames, hostnames)
m.lock.Unlock()
return err
}
// ManageCertificates is the same as AcquireCertificates but it does it in the background.
func (m *manager) ManageCertificates(ctx context.Context, hostnames []string) error {
m.lock.Lock()
added, removed := diffStringSlice(hostnames, m.hostnames)
m.hostnames = make([]string, len(hostnames))
copy(m.hostnames, hostnames)
m.lock.Unlock()
if len(removed) != 0 {
m.logger.WithField("hostnames", removed).Info().Log("Unmanage certificates")
m.config.Unmanage(removed)
}
if len(added) == 0 {
return nil
}
m.logger.WithField("hostnames", added).Info().Log("Acquiring certificates")
return m.config.ManageAsync(ctx, added)
}
// ManagedNames returns a list of the currently managed domain names.
func (m *manager) ManagedNames() []string {
m.lock.Lock()
defer m.lock.Unlock()
hostnames := make([]string, len(m.hostnames))
copy(hostnames, m.hostnames)
return hostnames
}
// TLSConfig is an opinionated method that returns a recommended, modern
// TLS configuration that can be used to configure TLS listeners. Aside
// from safe, modern defaults, this method sets one critical field on the
// TLS config which is required to enable automatic certificate
// management: GetCertificate.
//
// The GetCertificate field is necessary to get certificates from memory
// or storage, including both manual and automated certificates. You
// should only change this field if you know what you are doing.
func (m *manager) TLSConfig() *tls.Config {
return &tls.Config{
GetCertificate: m.GetCertificate,
// the rest recommended for modern TLS servers
MinVersion: tls.VersionTLS12,
CurvePreferences: []tls.CurveID{
tls.X25519,
tls.CurveP256,
},
CipherSuites: preferredDefaultCipherSuites(),
PreferServerCipherSuites: true,
}
}
// preferredDefaultCipherSuites returns an appropriate
// cipher suite to use depending on hardware support
// for AES-NI.
//
// See https://github.com/mholt/caddy/issues/1674
// Copied from https://github.com/caddyserver/certmagic/blob/d8e706f9b5011ecbaf20d3c1641e5446ad453613/crypto.go#L299
func preferredDefaultCipherSuites() []uint16 {
if cpuid.CPU.Supports(cpuid.AESNI) {
return defaultCiphersPreferAES
}
return defaultCiphersPreferChaCha
}
var (
defaultCiphersPreferAES = []uint16{
tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305,
tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305,
}
defaultCiphersPreferChaCha = []uint16{
tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305,
tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305,
tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
}
)
// diffHostnames returns a list of newly added hostnames and a list of removed hostnames based
// the provided list and the list of currently managed hostnames.
func diffStringSlice(next, current []string) ([]string, []string) {
added, removed := []string{}, []string{}
currentMap := map[string]struct{}{}
for _, name := range current {
currentMap[name] = struct{}{}
}
for _, name := range next {
if _, ok := currentMap[name]; ok {
delete(currentMap, name)
continue
}
added = append(added, name)
}
for name := range currentMap {
removed = append(removed, name)
}
return added, removed
}

View File

@@ -21,7 +21,6 @@ import (
"strings" "strings"
"github.com/datarhei/core/v16/cluster/client" "github.com/datarhei/core/v16/cluster/client"
"github.com/datarhei/core/v16/http/errorhandler"
"github.com/datarhei/core/v16/http/handler/util" "github.com/datarhei/core/v16/http/handler/util"
httplog "github.com/datarhei/core/v16/http/log" httplog "github.com/datarhei/core/v16/http/log"
mwlog "github.com/datarhei/core/v16/http/middleware/log" mwlog "github.com/datarhei/core/v16/http/middleware/log"
@@ -75,7 +74,7 @@ func NewAPI(config APIConfig) (API, error) {
a.router = echo.New() a.router = echo.New()
a.router.Debug = true a.router.Debug = true
a.router.HTTPErrorHandler = errorhandler.HTTPErrorHandler a.router.HTTPErrorHandler = ErrorHandler
a.router.Validator = validator.New() a.router.Validator = validator.New()
a.router.HideBanner = true a.router.HideBanner = true
a.router.HidePort = true a.router.HidePort = true
@@ -96,28 +95,10 @@ func NewAPI(config APIConfig) (API, error) {
doc := a.router.Group("/v1/swagger/*") doc := a.router.Group("/v1/swagger/*")
doc.GET("", echoSwagger.EchoWrapHandler(echoSwagger.InstanceName("ClusterAPI"))) doc.GET("", echoSwagger.EchoWrapHandler(echoSwagger.InstanceName("ClusterAPI")))
a.router.GET("/", func(c echo.Context) error { a.router.GET("/", a.Version)
return c.JSON(http.StatusOK, Version.String()) a.router.GET("/v1/about", a.Version)
})
a.router.GET("/v1/about", func(c echo.Context) error { a.router.GET("/v1/barrier/:name", a.Barrier)
return c.JSON(http.StatusOK, Version.String())
})
a.router.GET("/v1/ready", func(c echo.Context) error {
origin := c.Request().Header.Get("X-Cluster-Origin")
if origin == a.id {
return Err(http.StatusLoopDetected, "", "breaking circuit")
}
err := a.cluster.IsReady(origin)
if err != nil {
return Err(http.StatusLocked, "", "not ready yet")
}
return c.JSON(http.StatusOK, "OK")
})
a.router.POST("/v1/server", a.AddServer) a.router.POST("/v1/server", a.AddServer)
a.router.DELETE("/v1/server/:id", a.RemoveServer) a.router.DELETE("/v1/server/:id", a.RemoveServer)
@@ -139,6 +120,7 @@ func NewAPI(config APIConfig) (API, error) {
a.router.DELETE("/v1/lock/:name", a.Unlock) a.router.DELETE("/v1/lock/:name", a.Unlock)
a.router.POST("/v1/kv", a.SetKV) a.router.POST("/v1/kv", a.SetKV)
a.router.GET("/v1/kv/:key", a.GetKV)
a.router.DELETE("/v1/kv/:key", a.UnsetKV) a.router.DELETE("/v1/kv/:key", a.UnsetKV)
a.router.GET("/v1/core", a.CoreAPIAddress) a.router.GET("/v1/core", a.CoreAPIAddress)
@@ -157,6 +139,34 @@ func (a *api) Shutdown(ctx context.Context) error {
return a.router.Shutdown(ctx) return a.router.Shutdown(ctx)
} }
// Version returns the version of the cluster
// @Summary The cluster version
// @Description The cluster version
// @Tags v1.0.0
// @ID cluster-1-version
// @Produce json
// @Success 200 {string} string
// @Success 500 {object} Error
// @Router /v1/version [get]
func (a *api) Version(c echo.Context) error {
return c.JSON(http.StatusOK, Version.String())
}
// Barrier returns if the barrier already has been passed
// @Summary Has the barrier already has been passed
// @Description Has the barrier already has been passed
// @Tags v1.0.0
// @ID cluster-1-barrier
// @Produce json
// @Param name path string true "Barrier name"
// @Success 200 {string} string
// @Success 404 {object} Error
// @Router /v1/barrier/{name} [get]
func (a *api) Barrier(c echo.Context) error {
name := util.PathParam(c, "name")
return c.JSON(http.StatusOK, a.cluster.GetBarrier(name))
}
// AddServer adds a new server to the cluster // AddServer adds a new server to the cluster
// @Summary Add a new server // @Summary Add a new server
// @Description Add a new server to the cluster // @Description Add a new server to the cluster
@@ -239,7 +249,7 @@ func (a *api) RemoveServer(c echo.Context) error {
// @ID cluster-1-snapshot // @ID cluster-1-snapshot
// @Produce application/octet-stream // @Produce application/octet-stream
// @Success 200 {file} byte // @Success 200 {file} byte
// @Success 500 {array} Error // @Success 500 {object} Error
// @Router /v1/snapshot [get] // @Router /v1/snapshot [get]
func (a *api) Snapshot(c echo.Context) error { func (a *api) Snapshot(c echo.Context) error {
origin := c.Request().Header.Get("X-Cluster-Origin") origin := c.Request().Header.Get("X-Cluster-Origin")
@@ -747,8 +757,8 @@ func (a *api) SetKV(c echo.Context) error {
} }
// UnsetKV removes a key // UnsetKV removes a key
// @Summary Removes a key // @Summary Remove a key
// @Description Removes a key // @Description Remove a key
// @Tags v1.0.0 // @Tags v1.0.0
// @ID cluster-1-kv-unset // @ID cluster-1-kv-unset
// @Produce json // @Produce json
@@ -773,6 +783,7 @@ func (a *api) UnsetKV(c echo.Context) error {
err := a.cluster.UnsetKV(origin, key) err := a.cluster.UnsetKV(origin, key)
if err != nil { if err != nil {
if err == fs.ErrNotExist { if err == fs.ErrNotExist {
a.logger.Debug().WithError(err).WithField("key", key).Log("Delete key: not found")
return Err(http.StatusNotFound, "", "%s", err.Error()) return Err(http.StatusNotFound, "", "%s", err.Error())
} }
a.logger.Debug().WithError(err).WithField("key", key).Log("Unable to remove key") a.logger.Debug().WithError(err).WithField("key", key).Log("Unable to remove key")
@@ -782,6 +793,48 @@ func (a *api) UnsetKV(c echo.Context) error {
return c.JSON(http.StatusOK, "OK") return c.JSON(http.StatusOK, "OK")
} }
// GetKV fetches a key
// @Summary Fetch a key
// @Description Fetch a key
// @Tags v1.0.0
// @ID cluster-1-kv-get
// @Produce json
// @Param name path string true "Key name"
// @Param X-Cluster-Origin header string false "Origin ID of request"
// @Success 200 {string} string
// @Failure 404 {object} Error
// @Failure 500 {object} Error
// @Failure 508 {object} Error
// @Router /v1/kv/{key} [get]
func (a *api) GetKV(c echo.Context) error {
key := util.PathParam(c, "key")
origin := c.Request().Header.Get("X-Cluster-Origin")
if origin == a.id {
return Err(http.StatusLoopDetected, "", "breaking circuit")
}
a.logger.Debug().WithField("key", key).Log("Get key")
value, updatedAt, err := a.cluster.GetKV(origin, key)
if err != nil {
if err == fs.ErrNotExist {
a.logger.Debug().WithError(err).WithField("key", key).Log("Get key: not found")
return Err(http.StatusNotFound, "", "%s", err.Error())
}
a.logger.Debug().WithError(err).WithField("key", key).Log("Unable to retrieve key")
return Err(http.StatusInternalServerError, "", "unable to retrieve key: %s", err.Error())
}
res := client.GetKVResponse{
Value: value,
UpdatedAt: updatedAt,
}
return c.JSON(http.StatusOK, res)
}
// Error represents an error response of the API // Error represents an error response of the API
type Error struct { type Error struct {
Code int `json:"code" jsonschema:"required" format:"int"` Code int `json:"code" jsonschema:"required" format:"int"`
@@ -816,3 +869,50 @@ func Err(code int, message string, args ...interface{}) Error {
return e return e
} }
// ErrorHandler is a genral handler for echo handler errors
func ErrorHandler(err error, c echo.Context) {
var code int = 0
var details []string
message := ""
if he, ok := err.(Error); ok {
code = he.Code
message = he.Message
details = he.Details
} else if he, ok := err.(*echo.HTTPError); ok {
if he.Internal != nil {
if herr, ok := he.Internal.(*echo.HTTPError); ok {
he = herr
}
}
code = he.Code
message = http.StatusText(he.Code)
if len(message) == 0 {
switch code {
case 509:
message = "Bandwith limit exceeded"
default:
}
}
details = strings.Split(fmt.Sprintf("%v", he.Message), "\n")
} else {
code = http.StatusInternalServerError
message = http.StatusText(http.StatusInternalServerError)
details = strings.Split(fmt.Sprintf("%s", err), "\n")
}
// Send response
if !c.Response().Committed {
if c.Request().Method == http.MethodHead {
c.NoContent(code)
} else {
c.JSON(code, Error{
Code: code,
Message: message,
Details: details,
})
}
}
}

View File

@@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"io/fs"
"net/http" "net/http"
"net/url" "net/url"
"time" "time"
@@ -59,6 +60,11 @@ type SetKVRequest struct {
Value string `json:"value"` Value string `json:"value"`
} }
type GetKVResponse struct {
Value string `json:"value"`
UpdatedAt time.Time `json:"updated_at"`
}
type APIClient struct { type APIClient struct {
Address string Address string
Client *http.Client Client *http.Client
@@ -79,6 +85,21 @@ func (c *APIClient) Version() (string, error) {
return version, nil return version, nil
} }
func (c *APIClient) Barrier(name string) (bool, error) {
data, err := c.call(http.MethodGet, "/v1/barrier/"+url.PathEscape(name), "application/json", nil, "")
if err != nil {
return false, err
}
var passed bool
err = json.Unmarshal(data, &passed)
if err != nil {
return false, err
}
return passed, nil
}
func (c *APIClient) CoreAPIAddress() (string, error) { func (c *APIClient) CoreAPIAddress() (string, error) {
data, err := c.call(http.MethodGet, "/v1/core", "", nil, "") data, err := c.call(http.MethodGet, "/v1/core", "", nil, "")
if err != nil { if err != nil {
@@ -245,20 +266,40 @@ func (c *APIClient) SetKV(origin string, r SetKVRequest) error {
func (c *APIClient) UnsetKV(origin string, key string) error { func (c *APIClient) UnsetKV(origin string, key string) error {
_, err := c.call(http.MethodDelete, "/v1/kv/"+url.PathEscape(key), "application/json", nil, origin) _, err := c.call(http.MethodDelete, "/v1/kv/"+url.PathEscape(key), "application/json", nil, origin)
if err != nil {
e, ok := err.(httpapi.Error)
if ok && e.Code == 404 {
return fs.ErrNotExist
}
}
return err return err
} }
func (c *APIClient) GetKV(origin string, key string) (string, time.Time, error) {
data, err := c.call(http.MethodGet, "/v1/kv/"+url.PathEscape(key), "application/json", nil, origin)
if err != nil {
e, ok := err.(httpapi.Error)
if ok && e.Code == 404 {
return "", time.Time{}, fs.ErrNotExist
}
return "", time.Time{}, err
}
res := GetKVResponse{}
err = json.Unmarshal(data, &res)
if err != nil {
return "", time.Time{}, err
}
return res.Value, res.UpdatedAt, nil
}
func (c *APIClient) Snapshot(origin string) (io.ReadCloser, error) { func (c *APIClient) Snapshot(origin string) (io.ReadCloser, error) {
return c.stream(http.MethodGet, "/v1/snapshot", "", nil, origin) return c.stream(http.MethodGet, "/v1/snapshot", "", nil, origin)
} }
func (c *APIClient) IsReady(origin string) error {
_, err := c.call(http.MethodGet, "/v1/ready", "application/json", nil, origin)
return err
}
func (c *APIClient) stream(method, path, contentType string, data io.Reader, origin string) (io.ReadCloser, error) { func (c *APIClient) stream(method, path, contentType string, data io.Reader, origin string) (io.ReadCloser, error) {
if len(c.Address) == 0 { if len(c.Address) == 0 {
return nil, fmt.Errorf("no address defined") return nil, fmt.Errorf("no address defined")

View File

@@ -44,7 +44,9 @@ type Cluster interface {
CoreConfig() *config.Config CoreConfig() *config.Config
About() (ClusterAbout, error) About() (ClusterAbout, error)
IsReady(origin string) error IsClusterDegraded() (bool, error)
IsDegraded() (bool, error)
GetBarrier(name string) bool
Join(origin, id, raftAddress, peerAddress string) error Join(origin, id, raftAddress, peerAddress string) error
Leave(origin, id string) error // gracefully remove a node from the cluster Leave(origin, id string) error // gracefully remove a node from the cluster
@@ -76,7 +78,7 @@ type Cluster interface {
SetKV(origin, key, value string) error SetKV(origin, key, value string) error
UnsetKV(origin, key string) error UnsetKV(origin, key string) error
GetKV(key string) (string, time.Time, error) GetKV(origin, key string) (string, time.Time, error)
ListKV(prefix string) map[string]store.Value ListKV(prefix string) map[string]store.Value
ProxyReader() proxy.ProxyReader ProxyReader() proxy.ProxyReader
@@ -155,7 +157,8 @@ type cluster struct {
nodes map[string]*clusterNode nodes map[string]*clusterNode
nodesLock sync.RWMutex nodesLock sync.RWMutex
ready bool barrier map[string]bool
barrierLock sync.RWMutex
limiter net.IPLimiter limiter net.IPLimiter
} }
@@ -187,6 +190,8 @@ func New(ctx context.Context, config Config) (Cluster, error) {
config: config.CoreConfig, config: config.CoreConfig,
nodes: map[string]*clusterNode{}, nodes: map[string]*clusterNode{},
barrier: map[string]bool{},
limiter: config.IPLimiter, limiter: config.IPLimiter,
} }
@@ -352,6 +357,16 @@ func New(ctx context.Context, config Config) (Cluster, error) {
go c.monitorLeadership() go c.monitorLeadership()
go c.sentinel() go c.sentinel()
err = c.setup(ctx)
if err != nil {
c.Shutdown()
return nil, fmt.Errorf("failed to setup cluster: %w", err)
}
return c, nil
}
func (c *cluster) setup(ctx context.Context) error {
// Wait for a leader to be selected // Wait for a leader to be selected
c.logger.Info().Log("Waiting for a leader to be elected ...") c.logger.Info().Log("Waiting for a leader to be elected ...")
@@ -364,8 +379,7 @@ func New(ctx context.Context, config Config) (Cluster, error) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
c.Shutdown() return fmt.Errorf("starting cluster has been aborted: %w", ctx.Err())
return nil, fmt.Errorf("starting cluster has been aborted: %w", ctx.Err())
default: default:
} }
@@ -374,7 +388,7 @@ func New(ctx context.Context, config Config) (Cluster, error) {
c.logger.Info().Log("Leader has been elected") c.logger.Info().Log("Leader has been elected")
// Wait for cluster to leave degraded mode // Wait for all cluster nodes to leave degraded mode
c.logger.Info().Log("Waiting for cluster to become operational ...") c.logger.Info().Log("Waiting for cluster to become operational ...")
@@ -384,125 +398,128 @@ func New(ctx context.Context, config Config) (Cluster, error) {
break break
} }
c.logger.Warn().WithError(err).Log("Cluster is degraded") c.logger.Warn().WithError(err).Log("Cluster is in degraded state")
select { select {
case <-ctx.Done(): case <-ctx.Done():
c.Shutdown() return fmt.Errorf("starting cluster has been aborted: %w: %w", ctx.Err(), err)
return nil, fmt.Errorf("starting cluster has been aborted: %w: %w", ctx.Err(), err)
default: default:
} }
time.Sleep(time.Second) time.Sleep(time.Second)
} }
err := c.Barrier(ctx, "operational")
if err != nil {
return fmt.Errorf("failed on barrier: %w", err)
}
c.logger.Info().Log("Cluster is operational") c.logger.Info().Log("Cluster is operational")
if c.isTLSRequired { if c.isTLSRequired {
c.logger.Info().Log("Waiting for TLS certificates ...")
// Create certificate manager // Create certificate manager
names, err := c.getClusterHostnames() hostnames, err := c.getClusterHostnames()
if err != nil { if err != nil {
c.Shutdown() return fmt.Errorf("tls: failed to assemble list of all configured hostnames: %w", err)
return nil, fmt.Errorf("tls: failed to assemble list of all configured hostnames: %w", err)
} }
if len(names) == 0 { if len(hostnames) == 0 {
c.Shutdown() return fmt.Errorf("no hostnames are configured")
return nil, fmt.Errorf("tls: no hostnames are configured")
} }
kvs, err := NewClusterKVS(c, c.logger.WithComponent("KVS")) kvs, err := NewClusterKVS(c, c.logger.WithComponent("KVS"))
if err != nil { if err != nil {
c.Shutdown() return fmt.Errorf("tls: cluster KVS: %w", err)
return nil, fmt.Errorf("tls: cluster KVS: %w", err)
} }
storage, err := NewClusterStorage(kvs, "core-cluster-certificates") storage, err := NewClusterStorage(kvs, "core-cluster-certificates", c.logger.WithComponent("KVS"))
if err != nil { if err != nil {
c.Shutdown() return fmt.Errorf("tls: certificate store: %w", err)
return nil, fmt.Errorf("tls: certificate store: %w", err)
} }
manager, err := autocert.New(autocert.Config{ manager, err := autocert.New(autocert.Config{
Storage: storage, Storage: storage,
DefaultHostname: names[0], DefaultHostname: hostnames[0],
EmailAddress: c.config.TLS.Email, EmailAddress: c.config.TLS.Email,
IsProduction: !c.config.TLS.Staging, IsProduction: !c.config.TLS.Staging,
Logger: c.logger.WithComponent("Let's Encrypt"), Logger: c.logger.WithComponent("Let's Encrypt"),
}) })
if err != nil { if err != nil {
c.Shutdown() return fmt.Errorf("tls: certificate manager: %w", err)
return nil, fmt.Errorf("tls: certificate manager: %w", err)
} }
c.certManager = manager c.certManager = manager
if c.IsRaftLeader() { resctx, rescancel := context.WithCancel(ctx)
// Acquire certificates defer rescancel()
err = manager.AcquireCertificates(ctx, c.config.Address, names)
if err != nil { err = manager.HTTPChallengeResolver(resctx, c.config.Address)
c.Shutdown() if err != nil {
return nil, fmt.Errorf("tls: failed to acquire certificates: %w", err) return fmt.Errorf("tls: failed to start the HTTP challenge resolver: %w", err)
}
} }
// We have to wait for all nodes to have the HTTP challenge resolver started
err = c.Barrier(ctx, "acme")
if err != nil {
return fmt.Errorf("tls: failed on barrier: %w", err)
}
// Acquire certificates, all nodes can do this at the same time because everything
// is synched via the storage.
err = manager.AcquireCertificates(ctx, hostnames)
if err != nil {
return fmt.Errorf("tls: failed to acquire certificates: %w", err)
}
rescancel()
c.logger.Info().Log("TLS certificates acquired")
} }
if !c.IsRaftLeader() { c.logger.Info().Log("Waiting for cluster to become ready ...")
tempctx, cancel := context.WithCancel(context.Background())
if c.isTLSRequired { err = c.Barrier(ctx, "ready")
// All followers forward any HTTP requests to the leader such that it can respond to the HTTP challenge if err != nil {
leaderAddress, _ := c.raft.Leader() return fmt.Errorf("failed on barrier: %w", err)
leader, err := c.CoreAPIAddress(leaderAddress)
if err != nil {
cancel()
c.Shutdown()
return nil, fmt.Errorf("unable to find leader address: %w", err)
}
url, err := url.Parse(leader)
if err != nil {
cancel()
return nil, fmt.Errorf("unable to parse leader address: %w", err)
}
url.Scheme = "http"
url.Path = "/"
url.User = nil
url.RawQuery = ""
go func() {
c.logger.Info().WithField("leader", url.String()).Log("Forwarding ACME challenges to leader")
autocert.ProxyHTTPChallenge(tempctx, c.config.Address, url)
c.logger.Info().WithField("leader", url.String()).Log("Stopped forwarding ACME challenges to leader")
}()
}
for {
// Ask leader if it is ready
err := c.IsReady("")
if err == nil {
cancel()
break
}
select {
case <-ctx.Done():
cancel()
c.Shutdown()
return nil, fmt.Errorf("starting cluster has been aborted: %w", ctx.Err())
default:
}
time.Sleep(time.Second)
}
} }
c.ready = true
c.logger.Info().Log("Cluster is ready") c.logger.Info().Log("Cluster is ready")
return c, nil return nil
}
func (c *cluster) GetBarrier(name string) bool {
c.barrierLock.RLock()
defer c.barrierLock.RUnlock()
return c.barrier[name]
}
func (c *cluster) Barrier(ctx context.Context, name string) error {
c.barrierLock.Lock()
c.barrier[name] = true
c.barrierLock.Unlock()
for {
ok, err := c.getClusterBarrier(name)
if ok {
break
}
c.logger.Warn().WithField("name", name).WithError(err).Log("Waiting for barrier")
select {
case <-ctx.Done():
return fmt.Errorf("barrier %s: starting cluster has been aborted: %w: %w", name, ctx.Err(), err)
default:
}
time.Sleep(time.Second)
}
return nil
} }
func (c *cluster) Address() string { func (c *cluster) Address() string {
@@ -924,7 +941,7 @@ func (c *cluster) trackNodeChanges() {
c.nodesLock.Unlock() c.nodesLock.Unlock()
// Put the cluster in "degraded" mode in case there's a mismatch in expected values // Put the cluster in "degraded" mode in case there's a mismatch in expected values
err = c.checkClusterNodes() _, err = c.checkClusterNodes()
c.stateLock.Lock() c.stateLock.Lock()
if err != nil { if err != nil {
@@ -948,36 +965,62 @@ func (c *cluster) trackNodeChanges() {
c.isCoreDegradedErr = nil c.isCoreDegradedErr = nil
} }
c.stateLock.Unlock() c.stateLock.Unlock()
/*
if c.isTLSRequired {
// Update list of managed hostnames
if c.certManager != nil {
c.certManager.ManageCertificates(context.Background(), hostnames)
}
}
*/
case <-c.shutdownCh: case <-c.shutdownCh:
return return
} }
} }
} }
func (c *cluster) checkClusterNodes() error { // checkClusterNodes returns a list of all hostnames configured on all nodes. The
// returned list will not contain any duplicates. An error is returned in case the
// node is not compatible.
func (c *cluster) checkClusterNodes() ([]string, error) {
hostnames := map[string]struct{}{}
c.nodesLock.RLock() c.nodesLock.RLock()
defer c.nodesLock.RUnlock() defer c.nodesLock.RUnlock()
for id, node := range c.nodes { for id, node := range c.nodes {
if status, err := node.Status(); status == "offline" { if status, err := node.Status(); status == "offline" {
return fmt.Errorf("node %s is offline: %w", id, err) return nil, fmt.Errorf("node %s is offline: %w", id, err)
} }
version := node.Version() version := node.Version()
if err := verifyClusterVersion(version); err != nil { if err := verifyClusterVersion(version); err != nil {
return fmt.Errorf("node %s has a different version: %s: %w", id, version, err) return nil, fmt.Errorf("node %s has a different version: %s: %w", id, version, err)
} }
config, err := node.CoreConfig() config, err := node.CoreConfig()
if err != nil { if err != nil {
return fmt.Errorf("node %s has no configuration available: %w", id, err) return nil, fmt.Errorf("node %s has no configuration available: %w", id, err)
} }
if err := verifyClusterConfig(c.config, config); err != nil { if err := verifyClusterConfig(c.config, config); err != nil {
return fmt.Errorf("node %s has a different configuration: %w", id, err) return nil, fmt.Errorf("node %s has a different configuration: %w", id, err)
}
for _, name := range config.Host.Name {
hostnames[name] = struct{}{}
} }
} }
return nil names := []string{}
for key := range hostnames {
names = append(names, key)
}
sort.Strings(names)
return names, nil
} }
func (c *cluster) checkClusterCoreNodes() error { func (c *cluster) checkClusterCoreNodes() error {
@@ -993,6 +1036,8 @@ func (c *cluster) checkClusterCoreNodes() error {
return nil return nil
} }
// getClusterHostnames return a list of all hostnames configured on all nodes. The
// returned list will not contain any duplicates.
func (c *cluster) getClusterHostnames() ([]string, error) { func (c *cluster) getClusterHostnames() ([]string, error) {
hostnames := map[string]struct{}{} hostnames := map[string]struct{}{}
@@ -1021,6 +1066,21 @@ func (c *cluster) getClusterHostnames() ([]string, error) {
return names, nil return names, nil
} }
// getClusterBarrier returns whether all nodes are currently on the same barrier.
func (c *cluster) getClusterBarrier(name string) (bool, error) {
c.nodesLock.RLock()
defer c.nodesLock.RUnlock()
for _, node := range c.nodes {
ok, err := node.Barrier(name)
if !ok {
return false, err
}
}
return true, nil
}
func verifyClusterVersion(v string) error { func verifyClusterVersion(v string) error {
version, err := ParseClusterVersion(v) version, err := ParseClusterVersion(v)
if err != nil { if err != nil {
@@ -1500,7 +1560,15 @@ func (c *cluster) UnsetKV(origin, key string) error {
return c.applyCommand(cmd) return c.applyCommand(cmd)
} }
func (c *cluster) GetKV(key string) (string, time.Time, error) { func (c *cluster) GetKV(origin, key string) (string, time.Time, error) {
if ok, _ := c.IsClusterDegraded(); ok {
return "", time.Time{}, ErrDegraded
}
if !c.IsRaftLeader() {
return c.forwarder.GetKV(origin, key)
}
value, err := c.store.GetFromKVS(key) value, err := c.store.GetFromKVS(key)
if err != nil { if err != nil {
return "", time.Time{}, err return "", time.Time{}, err
@@ -1515,22 +1583,6 @@ func (c *cluster) ListKV(prefix string) map[string]store.Value {
return storeValues return storeValues
} }
func (c *cluster) IsReady(origin string) error {
if ok, _ := c.IsClusterDegraded(); ok {
return ErrDegraded
}
if !c.IsRaftLeader() {
return c.forwarder.IsReady(origin)
}
if !c.ready {
return fmt.Errorf("no ready yet")
}
return nil
}
func (c *cluster) applyCommand(cmd *store.Command) error { func (c *cluster) applyCommand(cmd *store.Command) error {
b, err := json.Marshal(cmd) b, err := json.Marshal(cmd)
if err != nil { if err != nil {

View File

@@ -24,6 +24,42 @@ const docTemplateClusterAPI = `{
"host": "{{.Host}}", "host": "{{.Host}}",
"basePath": "{{.BasePath}}", "basePath": "{{.BasePath}}",
"paths": { "paths": {
"/v1/barrier/{name}": {
"get": {
"description": "Has the barrier already has been passed",
"produces": [
"application/json"
],
"tags": [
"v1.0.0"
],
"summary": "Has the barrier already has been passed",
"operationId": "cluster-1-barrier",
"parameters": [
{
"type": "string",
"description": "Barrier name",
"name": "name",
"in": "path",
"required": true
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "string"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/cluster.Error"
}
}
}
}
},
"/v1/core": { "/v1/core": {
"get": { "get": {
"description": "Core API address and login of this node", "description": "Core API address and login of this node",
@@ -333,15 +369,67 @@ const docTemplateClusterAPI = `{
} }
}, },
"/v1/kv/{key}": { "/v1/kv/{key}": {
"delete": { "get": {
"description": "Removes a key", "description": "Fetch a key",
"produces": [ "produces": [
"application/json" "application/json"
], ],
"tags": [ "tags": [
"v1.0.0" "v1.0.0"
], ],
"summary": "Removes a key", "summary": "Fetch a key",
"operationId": "cluster-1-kv-get",
"parameters": [
{
"type": "string",
"description": "Key name",
"name": "name",
"in": "path",
"required": true
},
{
"type": "string",
"description": "Origin ID of request",
"name": "X-Cluster-Origin",
"in": "header"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "string"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/cluster.Error"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/cluster.Error"
}
},
"508": {
"description": "Loop Detected",
"schema": {
"$ref": "#/definitions/cluster.Error"
}
}
}
},
"delete": {
"description": "Remove a key",
"produces": [
"application/json"
],
"tags": [
"v1.0.0"
],
"summary": "Remove a key",
"operationId": "cluster-1-kv-unset", "operationId": "cluster-1-kv-unset",
"parameters": [ "parameters": [
{ {
@@ -910,10 +998,34 @@ const docTemplateClusterAPI = `{
"500": { "500": {
"description": "Internal Server Error", "description": "Internal Server Error",
"schema": { "schema": {
"type": "array", "$ref": "#/definitions/cluster.Error"
"items": { }
"$ref": "#/definitions/cluster.Error" }
} }
}
},
"/v1/version": {
"get": {
"description": "The cluster version",
"produces": [
"application/json"
],
"tags": [
"v1.0.0"
],
"summary": "The cluster version",
"operationId": "cluster-1-version",
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "string"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/cluster.Error"
} }
} }
} }

View File

@@ -16,6 +16,42 @@
}, },
"basePath": "/", "basePath": "/",
"paths": { "paths": {
"/v1/barrier/{name}": {
"get": {
"description": "Has the barrier already has been passed",
"produces": [
"application/json"
],
"tags": [
"v1.0.0"
],
"summary": "Has the barrier already has been passed",
"operationId": "cluster-1-barrier",
"parameters": [
{
"type": "string",
"description": "Barrier name",
"name": "name",
"in": "path",
"required": true
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "string"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/cluster.Error"
}
}
}
}
},
"/v1/core": { "/v1/core": {
"get": { "get": {
"description": "Core API address and login of this node", "description": "Core API address and login of this node",
@@ -325,15 +361,67 @@
} }
}, },
"/v1/kv/{key}": { "/v1/kv/{key}": {
"delete": { "get": {
"description": "Removes a key", "description": "Fetch a key",
"produces": [ "produces": [
"application/json" "application/json"
], ],
"tags": [ "tags": [
"v1.0.0" "v1.0.0"
], ],
"summary": "Removes a key", "summary": "Fetch a key",
"operationId": "cluster-1-kv-get",
"parameters": [
{
"type": "string",
"description": "Key name",
"name": "name",
"in": "path",
"required": true
},
{
"type": "string",
"description": "Origin ID of request",
"name": "X-Cluster-Origin",
"in": "header"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "string"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/cluster.Error"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/cluster.Error"
}
},
"508": {
"description": "Loop Detected",
"schema": {
"$ref": "#/definitions/cluster.Error"
}
}
}
},
"delete": {
"description": "Remove a key",
"produces": [
"application/json"
],
"tags": [
"v1.0.0"
],
"summary": "Remove a key",
"operationId": "cluster-1-kv-unset", "operationId": "cluster-1-kv-unset",
"parameters": [ "parameters": [
{ {
@@ -902,10 +990,34 @@
"500": { "500": {
"description": "Internal Server Error", "description": "Internal Server Error",
"schema": { "schema": {
"type": "array", "$ref": "#/definitions/cluster.Error"
"items": { }
"$ref": "#/definitions/cluster.Error" }
} }
}
},
"/v1/version": {
"get": {
"description": "The cluster version",
"produces": [
"application/json"
],
"tags": [
"v1.0.0"
],
"summary": "The cluster version",
"operationId": "cluster-1-version",
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "string"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/cluster.Error"
} }
} }
} }

View File

@@ -682,6 +682,30 @@ info:
title: datarhei Core Cluster API title: datarhei Core Cluster API
version: "1.0" version: "1.0"
paths: paths:
/v1/barrier/{name}:
get:
description: Has the barrier already has been passed
operationId: cluster-1-barrier
parameters:
- description: Barrier name
in: path
name: name
required: true
type: string
produces:
- application/json
responses:
"200":
description: OK
schema:
type: string
"404":
description: Not Found
schema:
$ref: '#/definitions/cluster.Error'
summary: Has the barrier already has been passed
tags:
- v1.0.0
/v1/core: /v1/core:
get: get:
description: Core API address and login of this node description: Core API address and login of this node
@@ -890,7 +914,7 @@ paths:
- v1.0.0 - v1.0.0
/v1/kv/{key}: /v1/kv/{key}:
delete: delete:
description: Removes a key description: Remove a key
operationId: cluster-1-kv-unset operationId: cluster-1-kv-unset
parameters: parameters:
- description: Key name - description: Key name
@@ -921,7 +945,42 @@ paths:
description: Loop Detected description: Loop Detected
schema: schema:
$ref: '#/definitions/cluster.Error' $ref: '#/definitions/cluster.Error'
summary: Removes a key summary: Remove a key
tags:
- v1.0.0
get:
description: Fetch a key
operationId: cluster-1-kv-get
parameters:
- description: Key name
in: path
name: name
required: true
type: string
- description: Origin ID of request
in: header
name: X-Cluster-Origin
type: string
produces:
- application/json
responses:
"200":
description: OK
schema:
type: string
"404":
description: Not Found
schema:
$ref: '#/definitions/cluster.Error'
"500":
description: Internal Server Error
schema:
$ref: '#/definitions/cluster.Error'
"508":
description: Loop Detected
schema:
$ref: '#/definitions/cluster.Error'
summary: Fetch a key
tags: tags:
- v1.0.0 - v1.0.0
/v1/lock: /v1/lock:
@@ -1276,10 +1335,26 @@ paths:
"500": "500":
description: Internal Server Error description: Internal Server Error
schema: schema:
items: $ref: '#/definitions/cluster.Error'
$ref: '#/definitions/cluster.Error'
type: array
summary: Cluster DB snapshot summary: Cluster DB snapshot
tags: tags:
- v1.0.0 - v1.0.0
/v1/version:
get:
description: The cluster version
operationId: cluster-1-version
produces:
- application/json
responses:
"200":
description: OK
schema:
type: string
"500":
description: Internal Server Error
schema:
$ref: '#/definitions/cluster.Error'
summary: The cluster version
tags:
- v1.0.0
swagger: "2.0" swagger: "2.0"

View File

@@ -38,8 +38,7 @@ type Forwarder interface {
SetKV(origin, key, value string) error SetKV(origin, key, value string) error
UnsetKV(origin, key string) error UnsetKV(origin, key string) error
GetKV(origin, key string) (string, time.Time, error)
IsReady(origin string) error
} }
type forwarder struct { type forwarder struct {
@@ -342,7 +341,7 @@ func (f *forwarder) UnsetKV(origin, key string) error {
return client.UnsetKV(origin, key) return client.UnsetKV(origin, key)
} }
func (f *forwarder) IsReady(origin string) error { func (f *forwarder) GetKV(origin, key string) (string, time.Time, error) {
if origin == "" { if origin == "" {
origin = f.id origin = f.id
} }
@@ -351,5 +350,5 @@ func (f *forwarder) IsReady(origin string) error {
client := f.client client := f.client
f.lock.RUnlock() f.lock.RUnlock()
return client.IsReady(origin) return client.GetKV(origin, key)
} }

View File

@@ -93,6 +93,7 @@ func (s *clusterKVS) DeleteLock(name string) error {
} }
func (s *clusterKVS) ListLocks() map[string]time.Time { func (s *clusterKVS) ListLocks() map[string]time.Time {
s.logger.Debug().Log("List locks")
return s.cluster.ListLocks() return s.cluster.ListLocks()
} }
@@ -111,10 +112,11 @@ func (s *clusterKVS) UnsetKV(key string) error {
func (s *clusterKVS) GetKV(key string) (string, time.Time, error) { func (s *clusterKVS) GetKV(key string) (string, time.Time, error) {
s.logger.Debug().WithField("key", key).Log("Get KV") s.logger.Debug().WithField("key", key).Log("Get KV")
return s.cluster.GetKV(key) return s.cluster.GetKV("", key)
} }
func (s *clusterKVS) ListKV(prefix string) map[string]store.Value { func (s *clusterKVS) ListKV(prefix string) map[string]store.Value {
s.logger.Debug().Log("List KV")
return s.cluster.ListKV(prefix) return s.cluster.ListKV(prefix)
} }

View File

@@ -193,6 +193,10 @@ func (n *clusterNode) CoreAPIAddress() (string, error) {
return n.client.CoreAPIAddress() return n.client.CoreAPIAddress()
} }
func (n *clusterNode) Barrier(name string) (bool, error) {
return n.client.Barrier(name)
}
func (n *clusterNode) Proxy() proxy.Node { func (n *clusterNode) Proxy() proxy.Node {
return n.proxyNode return n.proxyNode
} }

View File

@@ -11,6 +11,7 @@ import (
"time" "time"
"github.com/caddyserver/certmagic" "github.com/caddyserver/certmagic"
"github.com/datarhei/core/v16/log"
) )
type clusterStorage struct { type clusterStorage struct {
@@ -18,13 +19,20 @@ type clusterStorage struct {
prefix string prefix string
locks map[string]*Lock locks map[string]*Lock
muLocks sync.Mutex muLocks sync.Mutex
logger log.Logger
} }
func NewClusterStorage(kvs KVS, prefix string) (certmagic.Storage, error) { func NewClusterStorage(kvs KVS, prefix string, logger log.Logger) (certmagic.Storage, error) {
s := &clusterStorage{ s := &clusterStorage{
kvs: kvs, kvs: kvs,
prefix: prefix, prefix: prefix,
locks: map[string]*Lock{}, locks: map[string]*Lock{},
logger: logger,
}
if s.logger == nil {
s.logger = log.New("")
} }
return s, nil return s, nil
@@ -39,6 +47,7 @@ func (s *clusterStorage) unprefixKey(key string) string {
} }
func (s *clusterStorage) Lock(ctx context.Context, name string) error { func (s *clusterStorage) Lock(ctx context.Context, name string) error {
s.logger.Debug().WithField("name", name).Log("StorageLock")
for { for {
lock, err := s.kvs.CreateLock(s.prefixKey(name), time.Now().Add(time.Minute)) lock, err := s.kvs.CreateLock(s.prefixKey(name), time.Now().Add(time.Minute))
if err == nil { if err == nil {
@@ -65,13 +74,18 @@ func (s *clusterStorage) Lock(ctx context.Context, name string) error {
} }
func (s *clusterStorage) Unlock(ctx context.Context, name string) error { func (s *clusterStorage) Unlock(ctx context.Context, name string) error {
s.logger.Debug().WithField("name", name).Log("StorageUnlock")
err := s.kvs.DeleteLock(s.prefixKey(name)) err := s.kvs.DeleteLock(s.prefixKey(name))
if err != nil { if err != nil {
return err return err
} }
s.muLocks.Lock() s.muLocks.Lock()
delete(s.locks, name) lock, ok := s.locks[name]
if ok {
lock.Unlock()
delete(s.locks, name)
}
s.muLocks.Unlock() s.muLocks.Unlock()
return nil return nil
@@ -79,14 +93,17 @@ func (s *clusterStorage) Unlock(ctx context.Context, name string) error {
// Store puts value at key. // Store puts value at key.
func (s *clusterStorage) Store(ctx context.Context, key string, value []byte) error { func (s *clusterStorage) Store(ctx context.Context, key string, value []byte) error {
s.logger.Debug().WithField("key", key).Log("StorageStore")
encodedValue := base64.StdEncoding.EncodeToString(value) encodedValue := base64.StdEncoding.EncodeToString(value)
return s.kvs.SetKV(s.prefixKey(key), encodedValue) return s.kvs.SetKV(s.prefixKey(key), encodedValue)
} }
// Load retrieves the value at key. // Load retrieves the value at key.
func (s *clusterStorage) Load(ctx context.Context, key string) ([]byte, error) { func (s *clusterStorage) Load(ctx context.Context, key string) ([]byte, error) {
s.logger.Debug().WithField("key", key).Log("StorageLoad")
encodedValue, _, err := s.kvs.GetKV(s.prefixKey(key)) encodedValue, _, err := s.kvs.GetKV(s.prefixKey(key))
if err != nil { if err != nil {
s.logger.Debug().WithError(err).WithField("key", key).Log("StorageLoad")
return nil, err return nil, err
} }
@@ -97,12 +114,14 @@ func (s *clusterStorage) Load(ctx context.Context, key string) ([]byte, error) {
// returned only if the key still exists // returned only if the key still exists
// when the method returns. // when the method returns.
func (s *clusterStorage) Delete(ctx context.Context, key string) error { func (s *clusterStorage) Delete(ctx context.Context, key string) error {
s.logger.Debug().WithField("key", key).Log("StorageDelete")
return s.kvs.UnsetKV(s.prefixKey(key)) return s.kvs.UnsetKV(s.prefixKey(key))
} }
// Exists returns true if the key exists // Exists returns true if the key exists
// and there was no error checking. // and there was no error checking.
func (s *clusterStorage) Exists(ctx context.Context, key string) bool { func (s *clusterStorage) Exists(ctx context.Context, key string) bool {
s.logger.Debug().WithField("key", key).Log("StorageExits")
_, _, err := s.kvs.GetKV(s.prefixKey(key)) _, _, err := s.kvs.GetKV(s.prefixKey(key))
return err == nil return err == nil
} }
@@ -113,6 +132,7 @@ func (s *clusterStorage) Exists(ctx context.Context, key string) bool {
// should be walked); otherwise, only keys // should be walked); otherwise, only keys
// prefixed exactly by prefix will be listed. // prefixed exactly by prefix will be listed.
func (s *clusterStorage) List(ctx context.Context, prefix string, recursive bool) ([]string, error) { func (s *clusterStorage) List(ctx context.Context, prefix string, recursive bool) ([]string, error) {
s.logger.Debug().WithField("prefix", prefix).Log("StorageList")
values := s.kvs.ListKV(s.prefixKey(prefix)) values := s.kvs.ListKV(s.prefixKey(prefix))
keys := []string{} keys := []string{}
@@ -147,6 +167,7 @@ func (s *clusterStorage) List(ctx context.Context, prefix string, recursive bool
// Stat returns information about key. // Stat returns information about key.
func (s *clusterStorage) Stat(ctx context.Context, key string) (certmagic.KeyInfo, error) { func (s *clusterStorage) Stat(ctx context.Context, key string) (certmagic.KeyInfo, error) {
s.logger.Debug().WithField("key", key).Log("StorageStat")
encodedValue, lastModified, err := s.kvs.GetKV(s.prefixKey(key)) encodedValue, lastModified, err := s.kvs.GetKV(s.prefixKey(key))
if err != nil { if err != nil {
return certmagic.KeyInfo{}, err return certmagic.KeyInfo{}, err

View File

@@ -17,7 +17,7 @@ func setupStorage() (certmagic.Storage, error) {
return nil, err return nil, err
} }
return NewClusterStorage(kvs, "some_prefix") return NewClusterStorage(kvs, "some_prefix", nil)
} }
func TestStorageStore(t *testing.T) { func TestStorageStore(t *testing.T) {