Add experimental LE certificate retrieval for cluster

This commit is contained in:
Ingo Oppermann
2023-06-26 20:38:16 +02:00
parent ddb18a8c3c
commit a4b0c4fc36
8 changed files with 460 additions and 124 deletions

View File

@@ -17,6 +17,7 @@ import (
"time" "time"
"github.com/datarhei/core/v16/app" "github.com/datarhei/core/v16/app"
"github.com/datarhei/core/v16/autocert"
"github.com/datarhei/core/v16/cluster" "github.com/datarhei/core/v16/cluster"
"github.com/datarhei/core/v16/config" "github.com/datarhei/core/v16/config"
configstore "github.com/datarhei/core/v16/config/store" configstore "github.com/datarhei/core/v16/config/store"
@@ -53,7 +54,6 @@ import (
"github.com/caddyserver/certmagic" "github.com/caddyserver/certmagic"
"github.com/lestrrat-go/strftime" "github.com/lestrrat-go/strftime"
"go.uber.org/automaxprocs/maxprocs" "go.uber.org/automaxprocs/maxprocs"
"go.uber.org/zap"
) )
// The API interface is the implementation for the restreamer API. // The API interface is the implementation for the restreamer API.
@@ -473,7 +473,10 @@ func (a *api) start() error {
}) })
} }
cluster, err := cluster.New(cluster.Config{ ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
cluster, err := cluster.New(ctx, cluster.Config{
ID: cfg.ID, ID: cfg.ID,
Name: cfg.Name, Name: cfg.Name,
Path: filepath.Join(cfg.DB.Dir, "cluster"), Path: filepath.Join(cfg.DB.Dir, "cluster"),
@@ -482,7 +485,7 @@ func (a *api) start() error {
SyncInterval: time.Duration(cfg.Cluster.SyncInterval) * time.Second, SyncInterval: time.Duration(cfg.Cluster.SyncInterval) * time.Second,
NodeRecoverTimeout: time.Duration(cfg.Cluster.NodeRecoverTimeout) * time.Second, NodeRecoverTimeout: time.Duration(cfg.Cluster.NodeRecoverTimeout) * time.Second,
EmergencyLeaderTimeout: time.Duration(cfg.Cluster.EmergencyLeaderTimeout) * time.Second, EmergencyLeaderTimeout: time.Duration(cfg.Cluster.EmergencyLeaderTimeout) * time.Second,
Config: cfg.Clone(), CoreConfig: cfg.Clone(),
CoreAPIAddress: scheme + gonet.JoinHostPort(host, port), CoreAPIAddress: scheme + gonet.JoinHostPort(host, port),
CoreAPIUsername: cfg.API.Auth.Username, CoreAPIUsername: cfg.API.Auth.Username,
CoreAPIPassword: cfg.API.Auth.Password, CoreAPIPassword: cfg.API.Auth.Password,
@@ -496,6 +499,54 @@ func (a *api) start() error {
a.cluster = cluster a.cluster = cluster
} }
var autocertManager autocert.Manager
if cfg.TLS.Enable {
if cfg.TLS.Auto {
if len(cfg.Host.Name) == 0 {
return fmt.Errorf("at least one host must be provided in host.name or CORE_HOST_NAME")
}
if a.cluster == nil {
manager, err := autocert.New(autocert.Config{
Storage: &certmagic.FileStorage{
Path: filepath.Join(cfg.DB.Dir, "cert"),
},
DefaultHostname: cfg.Host.Name[0],
EmailAddress: cfg.TLS.Email,
IsProduction: false,
Logger: a.log.logger.core.WithComponent("Let's Encrypt"),
})
if err != nil {
return fmt.Errorf("failed to initialize autocert manager: %w", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
err = manager.AcquireCertificates(ctx, cfg.Address, cfg.Host.Name)
cancel()
autocertManager = manager
if err != nil {
a.log.logger.core.Error().WithError(err).Log("Failed to acquire certificates")
}
if err == nil {
a.log.logger.core.Warn().Log("Continuing with disabled TLS")
autocertManager = nil
cfg.TLS.Enable = false
} else {
cfg.TLS.CertFile = ""
cfg.TLS.KeyFile = ""
}
} else {
autocertManager = a.cluster.CertManager()
}
} else {
a.log.logger.core.Info().Log("Enabling TLS with cert and key files")
}
}
{ {
superuser := iamidentity.User{ superuser := iamidentity.User{
Name: cfg.API.Auth.Username, Name: cfg.API.Auth.Username,
@@ -1222,108 +1273,6 @@ func (a *api) start() error {
a.cache = cache a.cache = cache
} }
var autocertManager *certmagic.Config
if cfg.TLS.Enable {
if cfg.TLS.Auto {
if len(cfg.Host.Name) == 0 {
return fmt.Errorf("at least one host must be provided in host.name or CORE_HOST_NAME")
}
certmagic.Default.Storage = &certmagic.FileStorage{
Path: filepath.Join(cfg.DB.Dir, "cert"),
}
certmagic.Default.DefaultServerName = cfg.Host.Name[0]
certmagic.Default.Logger = zap.NewNop()
certmagic.DefaultACME.Agreed = true
certmagic.DefaultACME.Email = cfg.TLS.Email
certmagic.DefaultACME.CA = certmagic.LetsEncryptProductionCA
certmagic.DefaultACME.DisableHTTPChallenge = false
certmagic.DefaultACME.DisableTLSALPNChallenge = true
certmagic.DefaultACME.Logger = zap.NewNop()
magic := certmagic.NewDefault()
acme := certmagic.NewACMEIssuer(magic, certmagic.DefaultACME)
acme.Logger = zap.NewNop()
magic.Issuers = []certmagic.Issuer{acme}
magic.Logger = zap.NewNop()
autocertManager = magic
// Start temporary http server on configured port
tempserver := &gohttp.Server{
Addr: cfg.Address,
Handler: acme.HTTPChallengeHandler(gohttp.HandlerFunc(func(w gohttp.ResponseWriter, r *gohttp.Request) {
w.WriteHeader(gohttp.StatusNotFound)
})),
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
MaxHeaderBytes: 1 << 20,
}
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
tempserver.ListenAndServe()
wg.Done()
}()
var certerror bool
// For each domain, get the certificate
for _, host := range cfg.Host.Name {
logger := a.log.logger.core.WithComponent("Let's Encrypt").WithField("host", host)
logger.Info().Log("Acquiring certificate ...")
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Minute))
err := autocertManager.ManageSync(ctx, []string{host})
cancel()
if err != nil {
logger.Error().WithField("error", err).Log("Failed to acquire certificate")
certerror = true
/*
problems, err := letsdebug.Check(host, letsdebug.HTTP01)
if err != nil {
logger.Error().WithField("error", err).Log("Failed to debug certificate acquisition")
}
for _, p := range problems {
logger.Error().WithFields(log.Fields{
"name": p.Name,
"detail": p.Detail,
}).Log(p.Explanation)
}
*/
break
}
logger.Info().Log("Successfully acquired certificate")
}
// Shut down the temporary http server
tempserver.Close()
wg.Wait()
if certerror {
a.log.logger.core.Warn().Log("Continuing with disabled TLS")
autocertManager = nil
cfg.TLS.Enable = false
} else {
cfg.TLS.CertFile = ""
cfg.TLS.KeyFile = ""
}
} else {
a.log.logger.core.Info().Log("Enabling TLS with cert and key files")
}
}
if cfg.RTMP.Enable { if cfg.RTMP.Enable {
a.log.logger.rtmp = a.log.logger.core.WithComponent("RTMP").WithField("address", cfg.RTMP.Address) a.log.logger.rtmp = a.log.logger.core.WithComponent("RTMP").WithField("address", cfg.RTMP.Address)
@@ -1559,9 +1508,7 @@ func (a *api) start() error {
a.mainserver.TLSConfig = &tls.Config{ a.mainserver.TLSConfig = &tls.Config{
GetCertificate: autocertManager.GetCertificate, GetCertificate: autocertManager.GetCertificate,
} }
a.sidecarserver.Handler = autocertManager.HTTPChallengeHandler(sidecarserverhandler)
acme := autocertManager.Issuers[0].(*certmagic.ACMEIssuer)
a.sidecarserver.Handler = acme.HTTPChallengeHandler(sidecarserverhandler)
} }
wgStart.Add(1) wgStart.Add(1)

126
autocert/autocert.go Normal file
View File

@@ -0,0 +1,126 @@
package autocert
import (
"context"
"crypto/tls"
"fmt"
"net/http"
"strings"
"sync"
"time"
"github.com/datarhei/core/v16/log"
"github.com/caddyserver/certmagic"
"go.uber.org/zap"
)
type Manager interface {
AcquireCertificates(ctx context.Context, listenAddress string, hostnames []string) error
GetCertificate(*tls.ClientHelloInfo) (*tls.Certificate, error)
HTTPChallengeHandler(h http.Handler) http.Handler
}
type Config struct {
Storage certmagic.Storage
DefaultHostname string
EmailAddress string
IsProduction bool
Logger log.Logger
}
type manager struct {
config *certmagic.Config
logger log.Logger
}
func New(config Config) (Manager, error) {
m := &manager{
logger: config.Logger,
}
if m.logger == nil {
m.logger = log.New("")
}
certmagic.Default.Storage = config.Storage
certmagic.Default.DefaultServerName = config.DefaultHostname
certmagic.Default.Logger = zap.NewNop()
ca := certmagic.LetsEncryptStagingCA
if config.IsProduction {
ca = certmagic.LetsEncryptProductionCA
}
certmagic.DefaultACME.Agreed = true
certmagic.DefaultACME.Email = config.EmailAddress
certmagic.DefaultACME.CA = ca
certmagic.DefaultACME.DisableHTTPChallenge = false
certmagic.DefaultACME.DisableTLSALPNChallenge = true
certmagic.DefaultACME.Logger = zap.NewNop()
magic := certmagic.NewDefault()
acme := certmagic.NewACMEIssuer(magic, certmagic.DefaultACME)
acme.Logger = zap.NewNop()
magic.Issuers = []certmagic.Issuer{acme}
magic.Logger = zap.NewNop()
m.config = magic
return m, nil
}
func (m *manager) HTTPChallengeHandler(h http.Handler) http.Handler {
acme := m.config.Issuers[0].(*certmagic.ACMEIssuer)
return acme.HTTPChallengeHandler(h)
}
func (m *manager) GetCertificate(hello *tls.ClientHelloInfo) (*tls.Certificate, error) {
return m.config.GetCertificate(hello)
}
func (m *manager) AcquireCertificates(ctx context.Context, listenAddress string, hostnames []string) error {
acme := m.config.Issuers[0].(*certmagic.ACMEIssuer)
// Start temporary http server on configured port
tempserver := &http.Server{
Addr: listenAddress,
Handler: acme.HTTPChallengeHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
})),
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
MaxHeaderBytes: 1 << 20,
}
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
tempserver.ListenAndServe()
wg.Done()
}()
var certerr error
// Get the certificates
logger := m.logger.WithField("hostnames", hostnames)
logger.Info().Log("Acquiring certificate ...")
err := m.config.ManageSync(ctx, hostnames)
if err != nil {
certerr = fmt.Errorf("failed to acquire certificate for %s: %w", strings.Join(hostnames, ","), err)
} else {
logger.Info().Log("Successfully acquired certificate")
}
// Shut down the temporary http server
tempserver.Close()
wg.Wait()
return certerr
}

View File

@@ -104,6 +104,21 @@ func NewAPI(config APIConfig) (API, error) {
return c.JSON(http.StatusOK, Version.String()) return c.JSON(http.StatusOK, Version.String())
}) })
a.router.GET("/v1/ready", func(c echo.Context) error {
origin := c.Request().Header.Get("X-Cluster-Origin")
if origin == a.id {
return Err(http.StatusLoopDetected, "", "breaking circuit")
}
err := a.cluster.IsReady(origin)
if err != nil {
return Err(http.StatusLocked, "", "not ready yet")
}
return c.JSON(http.StatusOK, "OK")
})
a.router.POST("/v1/server", a.AddServer) a.router.POST("/v1/server", a.AddServer)
a.router.DELETE("/v1/server/:id", a.RemoveServer) a.router.DELETE("/v1/server/:id", a.RemoveServer)

View File

@@ -253,6 +253,12 @@ func (c *APIClient) Snapshot() (io.ReadCloser, error) {
return c.stream(http.MethodGet, "/v1/snapshot", "", nil, "") return c.stream(http.MethodGet, "/v1/snapshot", "", nil, "")
} }
func (c *APIClient) IsReady(origin string) error {
_, err := c.call(http.MethodGet, "/v1/ready", "application/json", nil, origin)
return err
}
func (c *APIClient) stream(method, path, contentType string, data io.Reader, origin string) (io.ReadCloser, error) { func (c *APIClient) stream(method, path, contentType string, data io.Reader, origin string) (io.ReadCloser, error) {
if len(c.Address) == 0 { if len(c.Address) == 0 {
return nil, fmt.Errorf("no address defined") return nil, fmt.Errorf("no address defined")

View File

@@ -8,10 +8,12 @@ import (
"io" "io"
gonet "net" gonet "net"
"net/url" "net/url"
"sort"
"strconv" "strconv"
"sync" "sync"
"time" "time"
"github.com/datarhei/core/v16/autocert"
apiclient "github.com/datarhei/core/v16/cluster/client" apiclient "github.com/datarhei/core/v16/cluster/client"
"github.com/datarhei/core/v16/cluster/forwarder" "github.com/datarhei/core/v16/cluster/forwarder"
clusteriam "github.com/datarhei/core/v16/cluster/iam" clusteriam "github.com/datarhei/core/v16/cluster/iam"
@@ -42,6 +44,7 @@ type Cluster interface {
CoreConfig() *config.Config CoreConfig() *config.Config
About() (ClusterAbout, error) About() (ClusterAbout, error)
IsReady(origin string) error
Join(origin, id, raftAddress, peerAddress string) error Join(origin, id, raftAddress, peerAddress string) error
Leave(origin, id string) error // gracefully remove a node from the cluster Leave(origin, id string) error // gracefully remove a node from the cluster
@@ -77,6 +80,7 @@ type Cluster interface {
ListKV(prefix string) map[string]store.Value ListKV(prefix string) map[string]store.Value
ProxyReader() proxy.ProxyReader ProxyReader() proxy.ProxyReader
CertManager() autocert.Manager
} }
type Peer struct { type Peer struct {
@@ -99,7 +103,7 @@ type Config struct {
CoreAPIUsername string // Username for the core API CoreAPIUsername string // Username for the core API
CoreAPIPassword string // Password for the core API CoreAPIPassword string // Password for the core API
Config *config.Config CoreConfig *config.Config
IPLimiter net.IPLimiter IPLimiter net.IPLimiter
Logger log.Logger Logger log.Logger
@@ -138,22 +142,29 @@ type cluster struct {
config *config.Config config *config.Config
coreAddress string coreAddress string
isDegraded bool isDegraded bool
isDegradedErr error isDegradedErr error
stateLock sync.Mutex isCoreDegraded bool
isCoreDegradedErr error
stateLock sync.Mutex
isRaftLeader bool isRaftLeader bool
hasRaftLeader bool hasRaftLeader bool
isLeader bool isLeader bool
leaderLock sync.Mutex leaderLock sync.Mutex
isTLSRequired bool
certManager autocert.Manager
nodes map[string]*clusterNode nodes map[string]*clusterNode
nodesLock sync.RWMutex nodesLock sync.RWMutex
ready bool
} }
var ErrDegraded = errors.New("cluster is currently degraded") var ErrDegraded = errors.New("cluster is currently degraded")
func New(config Config) (Cluster, error) { func New(ctx context.Context, config Config) (Cluster, error) {
c := &cluster{ c := &cluster{
id: config.ID, id: config.ID,
name: config.Name, name: config.Name,
@@ -169,7 +180,7 @@ func New(config Config) (Cluster, error) {
nodeRecoverTimeout: config.NodeRecoverTimeout, nodeRecoverTimeout: config.NodeRecoverTimeout,
emergencyLeaderTimeout: config.EmergencyLeaderTimeout, emergencyLeaderTimeout: config.EmergencyLeaderTimeout,
config: config.Config, config: config.CoreConfig,
nodes: map[string]*clusterNode{}, nodes: map[string]*clusterNode{},
} }
@@ -177,6 +188,8 @@ func New(config Config) (Cluster, error) {
return nil, fmt.Errorf("the core config must be provided") return nil, fmt.Errorf("the core config must be provided")
} }
c.isTLSRequired = c.config.TLS.Enable && c.config.TLS.Auto
u, err := url.Parse(config.CoreAPIAddress) u, err := url.Parse(config.CoreAPIAddress)
if err != nil { if err != nil {
return nil, fmt.Errorf("invalid core API address: %w", err) return nil, fmt.Errorf("invalid core API address: %w", err)
@@ -312,6 +325,112 @@ func New(config Config) (Cluster, error) {
go c.monitorLeadership() go c.monitorLeadership()
go c.sentinel() go c.sentinel()
// Wait for a leader to be selected
c.logger.Info().Log("Waiting for a leader to be elected ...")
for {
leader := c.raft.Leader()
if len(leader) != 0 {
break
}
select {
case <-ctx.Done():
c.Shutdown()
return nil, fmt.Errorf("starting cluster has been aborted: %w", ctx.Err())
default:
}
time.Sleep(100 * time.Millisecond)
}
c.logger.Info().Log("Leader has been elected")
// Wait for cluster to leave degraded mode
c.logger.Info().Log("Waiting for cluster to become operational ...")
for {
ok, _ := c.IsClusterDegraded()
if !ok {
break
}
select {
case <-ctx.Done():
c.Shutdown()
return nil, fmt.Errorf("starting cluster has been aborted: %w", ctx.Err())
default:
}
time.Sleep(100 * time.Millisecond)
}
c.logger.Info().Log("Cluster is operational")
if c.isTLSRequired && c.IsRaftLeader() {
names, err := c.getClusterHostnames()
if err != nil {
c.Shutdown()
return nil, fmt.Errorf("failed to assemble list of all configured hostnames: %w", err)
}
kvs, err := NewClusterKVS(c)
if err != nil {
c.Shutdown()
return nil, fmt.Errorf("cluster KVS: %w", err)
}
storage, err := NewClusterStorage(kvs, "core-cluster-certificates")
if err != nil {
c.Shutdown()
return nil, fmt.Errorf("certificate store: %w", err)
}
manager, err := autocert.New(autocert.Config{
Storage: storage,
DefaultHostname: names[0],
EmailAddress: c.config.TLS.Email,
IsProduction: false,
Logger: c.logger.WithComponent("Let's Encrypt"),
})
if err != nil {
c.Shutdown()
return nil, fmt.Errorf("certificate manager: %w", err)
}
c.certManager = manager
err = manager.AcquireCertificates(ctx, c.config.Address, names)
if err != nil {
c.Shutdown()
return nil, fmt.Errorf("failed to acquire certificates: %w", err)
}
}
if !c.IsRaftLeader() {
for {
err := c.IsReady("")
if err == nil {
break
}
select {
case <-ctx.Done():
c.Shutdown()
return nil, fmt.Errorf("starting cluster has been aborted: %w", ctx.Err())
default:
}
time.Sleep(time.Second)
}
}
c.ready = true
c.logger.Info().Log("Cluster is ready")
return c, nil return c, nil
} }
@@ -368,6 +487,10 @@ func (c *cluster) CoreConfig() *config.Config {
return c.config.Clone() return c.config.Clone()
} }
func (c *cluster) CertManager() autocert.Manager {
return c.certManager
}
func (c *cluster) Shutdown() error { func (c *cluster) Shutdown() error {
c.logger.Info().Log("Shutting down cluster") c.logger.Info().Log("Shutting down cluster")
c.shutdownLock.Lock() c.shutdownLock.Lock()
@@ -417,6 +540,17 @@ func (c *cluster) IsDegraded() (bool, error) {
c.stateLock.Lock() c.stateLock.Lock()
defer c.stateLock.Unlock() defer c.stateLock.Unlock()
if c.isDegraded {
return c.isDegraded, c.isDegradedErr
}
return c.isCoreDegraded, c.isCoreDegradedErr
}
func (c *cluster) IsClusterDegraded() (bool, error) {
c.stateLock.Lock()
defer c.stateLock.Unlock()
return c.isDegraded, c.isDegradedErr return c.isDegraded, c.isDegradedErr
} }
@@ -701,6 +835,19 @@ func (c *cluster) trackNodeChanges() {
c.isDegradedErr = nil c.isDegradedErr = nil
} }
c.stateLock.Unlock() c.stateLock.Unlock()
// Put the cluster in "coreDegraded" mode in case there's a mismatch in expected values
err = c.checkClusterCoreNodes()
c.stateLock.Lock()
if err != nil {
c.isCoreDegraded = true
c.isCoreDegradedErr = err
} else {
c.isCoreDegraded = false
c.isCoreDegradedErr = nil
}
c.stateLock.Unlock()
case <-c.shutdownCh: case <-c.shutdownCh:
return return
} }
@@ -733,6 +880,47 @@ func (c *cluster) checkClusterNodes() error {
return nil return nil
} }
func (c *cluster) checkClusterCoreNodes() error {
c.nodesLock.RLock()
defer c.nodesLock.RUnlock()
for id, node := range c.nodes {
if status, err := node.CoreStatus(); status == "offline" {
return fmt.Errorf("node %s core is offline: %w", id, err)
}
}
return nil
}
func (c *cluster) getClusterHostnames() ([]string, error) {
hostnames := map[string]struct{}{}
c.nodesLock.RLock()
defer c.nodesLock.RUnlock()
for id, node := range c.nodes {
config, err := node.CoreConfig()
if err != nil {
return nil, fmt.Errorf("node %s has no configuration available: %w", id, err)
}
for _, name := range config.Host.Name {
hostnames[name] = struct{}{}
}
}
names := []string{}
for key := range hostnames {
names = append(names, key)
}
sort.Strings(names)
return names, nil
}
func verifyClusterVersion(v string) error { func verifyClusterVersion(v string) error {
version, err := ParseClusterVersion(v) version, err := ParseClusterVersion(v)
if err != nil { if err != nil {
@@ -771,16 +959,20 @@ func verifyClusterConfig(local, remote *config.Config) error {
return fmt.Errorf("rtmp.enable is different") return fmt.Errorf("rtmp.enable is different")
} }
if local.RTMP.App != remote.RTMP.App { if local.RTMP.Enable {
return fmt.Errorf("rtmp.app is different") if local.RTMP.App != remote.RTMP.App {
return fmt.Errorf("rtmp.app is different")
}
} }
if local.SRT.Enable != remote.SRT.Enable { if local.SRT.Enable != remote.SRT.Enable {
return fmt.Errorf("srt.enable is different") return fmt.Errorf("srt.enable is different")
} }
if local.SRT.Passphrase != remote.SRT.Passphrase { if local.SRT.Enable {
return fmt.Errorf("srt.passphrase is different") if local.SRT.Passphrase != remote.SRT.Passphrase {
return fmt.Errorf("srt.passphrase is different")
}
} }
if local.Resources.MaxCPUUsage == 0 || remote.Resources.MaxCPUUsage == 0 { if local.Resources.MaxCPUUsage == 0 || remote.Resources.MaxCPUUsage == 0 {
@@ -791,6 +983,20 @@ func verifyClusterConfig(local, remote *config.Config) error {
return fmt.Errorf("resources.max_memory_usage must be defined") return fmt.Errorf("resources.max_memory_usage must be defined")
} }
if local.TLS.Enable != remote.TLS.Enable {
return fmt.Errorf("tls.enable is different")
}
if local.TLS.Enable {
if local.TLS.Auto != remote.TLS.Auto {
return fmt.Errorf("tls.auto is different")
}
if len(local.Host.Name) == 0 || len(remote.Host.Name) == 0 {
return fmt.Errorf("host.name must be set")
}
}
return nil return nil
} }
@@ -1085,7 +1291,7 @@ func (c *cluster) RemoveIdentity(origin string, name string) error {
} }
func (c *cluster) CreateLock(origin string, name string, validUntil time.Time) (*Lock, error) { func (c *cluster) CreateLock(origin string, name string, validUntil time.Time) (*Lock, error) {
if ok, _ := c.IsDegraded(); ok { if ok, _ := c.IsClusterDegraded(); ok {
return nil, ErrDegraded return nil, ErrDegraded
} }
@@ -1123,7 +1329,7 @@ func (c *cluster) CreateLock(origin string, name string, validUntil time.Time) (
} }
func (c *cluster) DeleteLock(origin string, name string) error { func (c *cluster) DeleteLock(origin string, name string) error {
if ok, _ := c.IsDegraded(); ok { if ok, _ := c.IsClusterDegraded(); ok {
return ErrDegraded return ErrDegraded
} }
@@ -1146,7 +1352,7 @@ func (c *cluster) ListLocks() map[string]time.Time {
} }
func (c *cluster) SetKV(origin, key, value string) error { func (c *cluster) SetKV(origin, key, value string) error {
if ok, _ := c.IsDegraded(); ok { if ok, _ := c.IsClusterDegraded(); ok {
return ErrDegraded return ErrDegraded
} }
@@ -1166,7 +1372,7 @@ func (c *cluster) SetKV(origin, key, value string) error {
} }
func (c *cluster) UnsetKV(origin, key string) error { func (c *cluster) UnsetKV(origin, key string) error {
if ok, _ := c.IsDegraded(); ok { if ok, _ := c.IsClusterDegraded(); ok {
return ErrDegraded return ErrDegraded
} }
@@ -1199,6 +1405,22 @@ func (c *cluster) ListKV(prefix string) map[string]store.Value {
return storeValues return storeValues
} }
func (c *cluster) IsReady(origin string) error {
if ok, _ := c.IsClusterDegraded(); ok {
return ErrDegraded
}
if !c.IsRaftLeader() {
return c.forwarder.IsReady(origin)
}
if !c.ready {
return fmt.Errorf("no ready yet")
}
return nil
}
func (c *cluster) applyCommand(cmd *store.Command) error { func (c *cluster) applyCommand(cmd *store.Command) error {
b, err := json.Marshal(cmd) b, err := json.Marshal(cmd)
if err != nil { if err != nil {

View File

@@ -38,6 +38,8 @@ type Forwarder interface {
SetKV(origin, key, value string) error SetKV(origin, key, value string) error
UnsetKV(origin, key string) error UnsetKV(origin, key string) error
IsReady(origin string) error
} }
type forwarder struct { type forwarder struct {
@@ -339,3 +341,15 @@ func (f *forwarder) UnsetKV(origin, key string) error {
return client.UnsetKV(origin, key) return client.UnsetKV(origin, key)
} }
func (f *forwarder) IsReady(origin string) error {
if origin == "" {
origin = f.id
}
f.lock.RLock()
client := f.client
f.lock.RUnlock()
return client.IsReady(origin)
}

View File

@@ -137,7 +137,14 @@ func (n *clusterNode) Status() (string, error) {
return "offline", fmt.Errorf("the cluster API didn't respond for %s because: %w", since, n.lastContactErr) return "offline", fmt.Errorf("the cluster API didn't respond for %s because: %w", since, n.lastContactErr)
} }
since = time.Since(n.lastCoreContact) return "online", nil
}
func (n *clusterNode) CoreStatus() (string, error) {
n.pingLock.RLock()
defer n.pingLock.RUnlock()
since := time.Since(n.lastCoreContact)
if since > 5*time.Second { if since > 5*time.Second {
return "offline", fmt.Errorf("the core API didn't respond for %s because: %w", since, n.lastCoreContactErr) return "offline", fmt.Errorf("the core API didn't respond for %s because: %w", since, n.lastCoreContactErr)
} }

View File

@@ -488,7 +488,6 @@ func (r *raft) trackLeaderChanges() {
obsCh := make(chan hcraft.Observation, 16) obsCh := make(chan hcraft.Observation, 16)
observer := hcraft.NewObserver(obsCh, false, func(o *hcraft.Observation) bool { observer := hcraft.NewObserver(obsCh, false, func(o *hcraft.Observation) bool {
_, leaderOK := o.Data.(hcraft.LeaderObservation) _, leaderOK := o.Data.(hcraft.LeaderObservation)
return leaderOK return leaderOK
}) })