From dd128ac99b773d3fbdeefbe331fbcbfe03a6e9a7 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Tue, 27 Jun 2023 14:52:32 +0200 Subject: [PATCH] Fix gathering of hostnames, add /v3/cluster/db/kv endpoint --- app/api/api.go | 31 +------- cluster/cluster.go | 78 +++++++++++++++---- cluster/kvs.go | 20 ++++- cluster/proxy/node.go | 2 +- config/config.go | 2 +- docs/docs.go | 43 ++++++++++ docs/swagger.json | 43 ++++++++++ docs/swagger.yaml | 27 +++++++ go.mod | 2 +- go.sum | 2 + http/api/cluster.go | 7 ++ http/handler/api/cluster.go | 24 ++++++ http/server.go | 1 + .../datarhei/core-client-go/v16/client.go | 2 + vendor/modules.txt | 2 +- 15 files changed, 234 insertions(+), 52 deletions(-) diff --git a/app/api/api.go b/app/api/api.go index 7d9719e4..b83f7cc3 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -433,32 +433,6 @@ func (a *api) start(ctx context.Context) error { } if cfg.Cluster.Enable { - scheme := "http://" - address := cfg.Address - - if cfg.TLS.Enable { - scheme = "https://" - address = cfg.TLS.Address - } - - host, port, err := gonet.SplitHostPort(address) - if err != nil { - return fmt.Errorf("invalid core address: %s: %w", address, err) - } - - if len(host) == 0 { - chost, _, err := gonet.SplitHostPort(cfg.Cluster.Address) - if err != nil { - return fmt.Errorf("invalid cluster address: %s: %w", cfg.Cluster.Address, err) - } - - if len(chost) == 0 { - return fmt.Errorf("invalid cluster address: %s: %w", cfg.Cluster.Address, err) - } - - host = chost - } - peers := []cluster.Peer{} for _, p := range cfg.Cluster.Peers { @@ -486,9 +460,6 @@ func (a *api) start(ctx context.Context) error { NodeRecoverTimeout: time.Duration(cfg.Cluster.NodeRecoverTimeout) * time.Second, EmergencyLeaderTimeout: time.Duration(cfg.Cluster.EmergencyLeaderTimeout) * time.Second, CoreConfig: cfg.Clone(), - CoreAPIAddress: scheme + gonet.JoinHostPort(host, port), - CoreAPIUsername: cfg.API.Auth.Username, - CoreAPIPassword: cfg.API.Auth.Password, IPLimiter: a.sessionsLimiter, Logger: a.log.logger.core.WithComponent("Cluster"), }) @@ -531,7 +502,7 @@ func (a *api) start(ctx context.Context) error { a.log.logger.core.Error().WithError(err).Log("Failed to acquire certificates") } - if err == nil { + if err != nil { a.log.logger.core.Warn().Log("Continuing with disabled TLS") autocertManager = nil cfg.TLS.Enable = false diff --git a/cluster/cluster.go b/cluster/cluster.go index 02400087..b455b05c 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -99,10 +99,6 @@ type Config struct { NodeRecoverTimeout time.Duration // Timeout for a node to recover before rebalancing the processes EmergencyLeaderTimeout time.Duration // Timeout for establishing the emergency leadership after lost contact to raft leader - CoreAPIAddress string // Address of the core API - CoreAPIUsername string // Username for the core API - CoreAPIPassword string // Password for the core API - CoreConfig *config.Config IPLimiter net.IPLimiter @@ -180,6 +176,12 @@ func New(ctx context.Context, config Config) (Cluster, error) { nodeRecoverTimeout: config.NodeRecoverTimeout, emergencyLeaderTimeout: config.EmergencyLeaderTimeout, + isDegraded: true, + isDegradedErr: fmt.Errorf("cluster not yet startet"), + + isCoreDegraded: true, + isCoreDegradedErr: fmt.Errorf("cluster not yet started"), + config: config.CoreConfig, nodes: map[string]*clusterNode{}, } @@ -190,15 +192,34 @@ func New(ctx context.Context, config Config) (Cluster, error) { c.isTLSRequired = c.config.TLS.Enable && c.config.TLS.Auto - u, err := url.Parse(config.CoreAPIAddress) + host, port, err := gonet.SplitHostPort(c.config.Address) if err != nil { - return nil, fmt.Errorf("invalid core API address: %w", err) + return nil, fmt.Errorf("invalid core address: %s: %w", c.config.Address, err) } - if len(config.CoreAPIPassword) == 0 { - u.User = url.User(config.CoreAPIUsername) + chost, _, err := gonet.SplitHostPort(c.config.Cluster.Address) + if err != nil { + return nil, fmt.Errorf("invalid cluster address: %s: %w", c.config.Cluster.Address, err) + } + + if len(chost) == 0 { + return nil, fmt.Errorf("invalid cluster address: %s: %w", c.config.Cluster.Address, err) + } + + if len(host) == 0 { + host = chost + } + + u := &url.URL{ + Scheme: "http", + Host: gonet.JoinHostPort(host, port), + Path: "/", + } + + if len(c.config.API.Auth.Password) == 0 { + u.User = url.User(c.config.API.Auth.Username) } else { - u.User = url.UserPassword(config.CoreAPIUsername, config.CoreAPIPassword) + u.User = url.UserPassword(c.config.API.Auth.Username, c.config.API.Auth.Password) } c.coreAddress = u.String() @@ -375,19 +396,24 @@ func New(ctx context.Context, config Config) (Cluster, error) { names, err := c.getClusterHostnames() if err != nil { c.Shutdown() - return nil, fmt.Errorf("failed to assemble list of all configured hostnames: %w", err) + return nil, fmt.Errorf("tls: failed to assemble list of all configured hostnames: %w", err) } - kvs, err := NewClusterKVS(c) + if len(names) == 0 { + c.Shutdown() + return nil, fmt.Errorf("tls: no hostnames are configured") + } + + kvs, err := NewClusterKVS(c, c.logger.WithComponent("KVS")) if err != nil { c.Shutdown() - return nil, fmt.Errorf("cluster KVS: %w", err) + return nil, fmt.Errorf("tls: cluster KVS: %w", err) } storage, err := NewClusterStorage(kvs, "core-cluster-certificates") if err != nil { c.Shutdown() - return nil, fmt.Errorf("certificate store: %w", err) + return nil, fmt.Errorf("tls: certificate store: %w", err) } manager, err := autocert.New(autocert.Config{ @@ -399,7 +425,7 @@ func New(ctx context.Context, config Config) (Cluster, error) { }) if err != nil { c.Shutdown() - return nil, fmt.Errorf("certificate manager: %w", err) + return nil, fmt.Errorf("tls: certificate manager: %w", err) } c.certManager = manager @@ -407,7 +433,7 @@ func New(ctx context.Context, config Config) (Cluster, error) { err = manager.AcquireCertificates(ctx, c.config.Address, names) if err != nil { c.Shutdown() - return nil, fmt.Errorf("failed to acquire certificates: %w", err) + return nil, fmt.Errorf("tls: failed to acquire certificates: %w", err) } } } @@ -585,9 +611,27 @@ func (c *cluster) IsDegraded() (bool, error) { func (c *cluster) IsClusterDegraded() (bool, error) { c.stateLock.Lock() - defer c.stateLock.Unlock() + isDegraded, isDegradedErr := c.isDegraded, c.isDegradedErr + c.stateLock.Unlock() - return c.isDegraded, c.isDegradedErr + if isDegraded { + return isDegraded, isDegradedErr + } + + servers, err := c.raft.Servers() + if err != nil { + return true, err + } + + c.nodesLock.Lock() + nodes := len(c.nodes) + c.nodesLock.Unlock() + + if len(servers) != nodes { + return true, fmt.Errorf("not all nodes are connected") + } + + return false, nil } func (c *cluster) Leave(origin, id string) error { diff --git a/cluster/kvs.go b/cluster/kvs.go index 12ce8e8c..12b499a3 100644 --- a/cluster/kvs.go +++ b/cluster/kvs.go @@ -9,6 +9,7 @@ import ( "time" "github.com/datarhei/core/v16/cluster/store" + "github.com/datarhei/core/v16/log" ) type KVS interface { @@ -62,21 +63,32 @@ func (l *Lock) Unlock() { type clusterKVS struct { cluster Cluster + logger log.Logger } -func NewClusterKVS(cluster Cluster) (KVS, error) { +func NewClusterKVS(cluster Cluster, logger log.Logger) (KVS, error) { s := &clusterKVS{ cluster: cluster, + logger: logger, + } + + if s.logger == nil { + s.logger = log.New("") } return s, nil } func (s *clusterKVS) CreateLock(name string, validUntil time.Time) (*Lock, error) { + s.logger.Debug().WithFields(log.Fields{ + "name": name, + "valid_until": validUntil, + }).Log("Create lock") return s.cluster.CreateLock("", name, validUntil) } func (s *clusterKVS) DeleteLock(name string) error { + s.logger.Debug().WithField("name", name).Log("Delete lock") return s.cluster.DeleteLock("", name) } @@ -85,14 +97,20 @@ func (s *clusterKVS) ListLocks() map[string]time.Time { } func (s *clusterKVS) SetKV(key, value string) error { + s.logger.Debug().WithFields(log.Fields{ + "key": key, + "value": value, + }).Log("Set KV") return s.cluster.SetKV("", key, value) } func (s *clusterKVS) UnsetKV(key string) error { + s.logger.Debug().WithField("key", key).Log("Unset KV") return s.cluster.UnsetKV("", key) } func (s *clusterKVS) GetKV(key string) (string, time.Time, error) { + s.logger.Debug().WithField("key", key).Log("Get KV") return s.cluster.GetKV(key) } diff --git a/cluster/proxy/node.go b/cluster/proxy/node.go index 51f7e97f..9e67fd34 100644 --- a/cluster/proxy/node.go +++ b/cluster/proxy/node.go @@ -233,7 +233,7 @@ func (n *node) connect(ctx context.Context) error { } peer, err := client.New(client.Config{ - Address: n.address, + Address: u.String(), Client: &http.Client{ Timeout: 5 * time.Second, }, diff --git a/config/config.go b/config/config.go index 22e1913a..c866c49a 100644 --- a/config/config.go +++ b/config/config.go @@ -483,7 +483,7 @@ func (d *Config) Validate(resetLogs bool) { } } - // If cluster mode is enabled, we can't join and bootstrap at the same time + // If cluster mode is enabled, a proper address must be provided if d.Cluster.Enable { if len(d.Cluster.Address) == 0 { d.vars.Log("error", "cluster.address", "must be provided") diff --git a/docs/docs.go b/docs/docs.go index 4128d212..ddd92fde 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -154,6 +154,32 @@ const docTemplate = `{ } } }, + "/api/v3/cluster/db/kv": { + "get": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "List of KV in the cluster DB", + "produces": [ + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "List KV in the cluster DB", + "operationId": "cluster-3-db-list-kv", + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/api.ClusterKVS" + } + } + } + } + }, "/api/v3/cluster/db/locks": { "get": { "security": [ @@ -4174,6 +4200,23 @@ const docTemplate = `{ } } }, + "api.ClusterKVS": { + "type": "object", + "additionalProperties": { + "$ref": "#/definitions/api.ClusterKVSValue" + } + }, + "api.ClusterKVSValue": { + "type": "object", + "properties": { + "updated_at": { + "type": "string" + }, + "value": { + "type": "string" + } + } + }, "api.ClusterLock": { "type": "object", "properties": { diff --git a/docs/swagger.json b/docs/swagger.json index 874b450b..e3e6b87e 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -146,6 +146,32 @@ } } }, + "/api/v3/cluster/db/kv": { + "get": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "List of KV in the cluster DB", + "produces": [ + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "List KV in the cluster DB", + "operationId": "cluster-3-db-list-kv", + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/api.ClusterKVS" + } + } + } + } + }, "/api/v3/cluster/db/locks": { "get": { "security": [ @@ -4166,6 +4192,23 @@ } } }, + "api.ClusterKVS": { + "type": "object", + "additionalProperties": { + "$ref": "#/definitions/api.ClusterKVSValue" + } + }, + "api.ClusterKVSValue": { + "type": "object", + "properties": { + "updated_at": { + "type": "string" + }, + "value": { + "type": "string" + } + } + }, "api.ClusterLock": { "type": "object", "properties": { diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 9adbe75a..a73570be 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -88,6 +88,17 @@ definitions: version: type: string type: object + api.ClusterKVS: + additionalProperties: + $ref: '#/definitions/api.ClusterKVSValue' + type: object + api.ClusterKVSValue: + properties: + updated_at: + type: string + value: + type: string + type: object api.ClusterLock: properties: name: @@ -2467,6 +2478,22 @@ paths: summary: List of nodes in the cluster tags: - v16.?.? + /api/v3/cluster/db/kv: + get: + description: List of KV in the cluster DB + operationId: cluster-3-db-list-kv + produces: + - application/json + responses: + "200": + description: OK + schema: + $ref: '#/definitions/api.ClusterKVS' + security: + - ApiKeyAuth: [] + summary: List KV in the cluster DB + tags: + - v16.?.? /api/v3/cluster/db/locks: get: description: List of locks in the cluster DB diff --git a/go.mod b/go.mod index 91e26926..3388f217 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/atrox/haikunatorgo/v2 v2.0.1 github.com/caddyserver/certmagic v0.18.0 github.com/casbin/casbin/v2 v2.71.1 - github.com/datarhei/core-client-go/v16 v16.11.1-0.20230621092648-5cc658d8a73c + github.com/datarhei/core-client-go/v16 v16.11.1-0.20230627120001-16d06aa77802 github.com/datarhei/gosrt v0.5.2 github.com/datarhei/joy4 v0.0.0-20230505074825-fde05957445a github.com/fujiwara/shapeio v1.0.0 diff --git a/go.sum b/go.sum index 66dcec52..c80b6dc0 100644 --- a/go.sum +++ b/go.sum @@ -62,6 +62,8 @@ github.com/datarhei/core-client-go/v16 v16.11.1-0.20230621090732-c6ae9699cea6 h1 github.com/datarhei/core-client-go/v16 v16.11.1-0.20230621090732-c6ae9699cea6/go.mod h1:6L0zr/NUwvaPsCTK/IL17m8JUEtgLp3BDtlsBREwacg= github.com/datarhei/core-client-go/v16 v16.11.1-0.20230621092648-5cc658d8a73c h1:WUjP8x7hl1EsTbV5w8WEhK6c8t3uRqLUSn7+FFHDpJU= github.com/datarhei/core-client-go/v16 v16.11.1-0.20230621092648-5cc658d8a73c/go.mod h1:6L0zr/NUwvaPsCTK/IL17m8JUEtgLp3BDtlsBREwacg= +github.com/datarhei/core-client-go/v16 v16.11.1-0.20230627120001-16d06aa77802 h1:F4ILviOV6brxg25hMyzzyI0K/C9yLzYHgecPD1PaokQ= +github.com/datarhei/core-client-go/v16 v16.11.1-0.20230627120001-16d06aa77802/go.mod h1:6L0zr/NUwvaPsCTK/IL17m8JUEtgLp3BDtlsBREwacg= github.com/datarhei/gosrt v0.5.2 h1:eagqZwEIiGPNJW0rLep3gwceObyaZ17+iKRc+l4VEpc= github.com/datarhei/gosrt v0.5.2/go.mod h1:0308GQhAu5hxe2KYdbss901aKceSSKXnwCr8Vs++eiw= github.com/datarhei/joy4 v0.0.0-20230505074825-fde05957445a h1:Tf4DSHY1xruBglr+yYP5Wct7czM86GKMYgbXH8a7OFo= diff --git a/http/api/cluster.go b/http/api/cluster.go index 8220e581..2215264d 100644 --- a/http/api/cluster.go +++ b/http/api/cluster.go @@ -92,3 +92,10 @@ type ClusterLock struct { Name string `json:"name"` ValidUntil time.Time `json:"valid_until"` } + +type ClusterKVSValue struct { + Value string `json:"value"` + UpdatedAt time.Time `json:"updated_at"` +} + +type ClusterKVS map[string]ClusterKVSValue diff --git a/http/handler/api/cluster.go b/http/handler/api/cluster.go index 2de64e3b..a9c307f0 100644 --- a/http/handler/api/cluster.go +++ b/http/handler/api/cluster.go @@ -1267,3 +1267,27 @@ func (h *ClusterHandler) ListStoreLocks(c echo.Context) error { return c.JSON(http.StatusOK, locks) } + +// ListStoreKV returns the list of currently stored key/value pairs +// @Summary List KV in the cluster DB +// @Description List of KV in the cluster DB +// @Tags v16.?.? +// @ID cluster-3-db-list-kv +// @Produce json +// @Success 200 {object} api.ClusterKVS +// @Security ApiKeyAuth +// @Router /api/v3/cluster/db/kv [get] +func (h *ClusterHandler) ListStoreKV(c echo.Context) error { + clusterkv := h.cluster.ListKV("") + + kvs := api.ClusterKVS{} + + for key, v := range clusterkv { + kvs[key] = api.ClusterKVSValue{ + Value: v.Value, + UpdatedAt: v.UpdatedAt, + } + } + + return c.JSON(http.StatusOK, kvs) +} diff --git a/http/server.go b/http/server.go index 0c320be7..a0f93fcd 100644 --- a/http/server.go +++ b/http/server.go @@ -680,6 +680,7 @@ func (s *server) setRoutesV3(v3 *echo.Group) { v3.GET("/cluster/db/user/:name", s.v3handler.cluster.ListStoreIdentity) v3.GET("/cluster/db/policies", s.v3handler.cluster.ListStorePolicies) v3.GET("/cluster/db/locks", s.v3handler.cluster.ListStoreLocks) + v3.GET("/cluster/db/kv", s.v3handler.cluster.ListStoreKV) v3.GET("/cluster/iam/user", s.v3handler.cluster.ListIdentities) v3.GET("/cluster/iam/user/:name", s.v3handler.cluster.ListIdentity) diff --git a/vendor/github.com/datarhei/core-client-go/v16/client.go b/vendor/github.com/datarhei/core-client-go/v16/client.go index 275bc277..d0c60757 100644 --- a/vendor/github.com/datarhei/core-client-go/v16/client.go +++ b/vendor/github.com/datarhei/core-client-go/v16/client.go @@ -172,6 +172,8 @@ func New(config Config) (RestClient, error) { r.address = u.String() } + r.address = strings.TrimSuffix(r.address, "/") + if r.client == nil { r.client = &http.Client{ Timeout: 15 * time.Second, diff --git a/vendor/modules.txt b/vendor/modules.txt index 52d1406b..f8258781 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -78,7 +78,7 @@ github.com/cespare/xxhash/v2 # github.com/cpuguy83/go-md2man/v2 v2.0.2 ## explicit; go 1.11 github.com/cpuguy83/go-md2man/v2/md2man -# github.com/datarhei/core-client-go/v16 v16.11.1-0.20230621092648-5cc658d8a73c +# github.com/datarhei/core-client-go/v16 v16.11.1-0.20230627120001-16d06aa77802 ## explicit; go 1.18 github.com/datarhei/core-client-go/v16 github.com/datarhei/core-client-go/v16/api