Expose general infos in cluster about, bump cluster API version to 2.0.0

This commit is contained in:
Ingo Oppermann
2024-04-08 21:55:07 +02:00
parent 29848ab9df
commit 487529c598
11 changed files with 120 additions and 57 deletions

View File

@@ -530,7 +530,8 @@ func (a *api) start(ctx context.Context) error {
}
cluster, err := cluster.New(cluster.Config{
ID: cfg.ID,
ID: cfg.Cluster.ID,
NodeID: cfg.ID,
Name: cfg.Name,
Path: filepath.Join(cfg.DB.Dir, "cluster"),
Address: cfg.Cluster.Address,

View File

@@ -30,6 +30,7 @@ import (
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/net"
"github.com/datarhei/core/v16/restream/app"
"github.com/datarhei/core/v16/slices"
)
type Cluster interface {
@@ -102,7 +103,8 @@ type DebugConfig struct {
}
type Config struct {
ID string // ID of the node
ID string // ID of the cluster
NodeID string // ID of the node
Name string // Name of the node
Path string // Path where to store all cluster data
Address string // Listen address for the raft protocol
@@ -122,9 +124,10 @@ type Config struct {
}
type cluster struct {
id string
name string
path string
id string
nodeID string
name string
path string
logger log.Logger
@@ -159,7 +162,8 @@ type cluster struct {
isDegradedErr error
isCoreDegraded bool
isCoreDegradedErr error
stateLock sync.Mutex
hostnames []string
stateLock sync.RWMutex
isRaftLeader bool
hasRaftLeader bool
@@ -186,6 +190,7 @@ var ErrDegraded = errors.New("cluster is currently degraded")
func New(config Config) (Cluster, error) {
c := &cluster{
id: config.ID,
nodeID: config.NodeID,
name: config.Name,
path: config.Path,
logger: config.Logger,
@@ -277,7 +282,7 @@ func New(config Config) (Cluster, error) {
c.store = store
api, err := NewAPI(APIConfig{
ID: c.id,
ID: c.nodeID,
Cluster: c,
Logger: c.logger.WithField("logname", "api"),
})
@@ -292,7 +297,7 @@ func New(config Config) (Cluster, error) {
c.api = api
nodeproxy, err := proxy.NewProxy(proxy.ProxyConfig{
ID: c.id,
ID: c.nodeID,
Logger: c.logger.WithField("logname", "proxy"),
})
if err != nil {
@@ -307,7 +312,7 @@ func New(config Config) (Cluster, error) {
c.proxy = nodeproxy
if forwarder, err := forwarder.New(forwarder.ForwarderConfig{
ID: c.id,
ID: c.nodeID,
Logger: c.logger.WithField("logname", "forwarder"),
}); err != nil {
c.Shutdown()
@@ -321,7 +326,7 @@ func New(config Config) (Cluster, error) {
peers := []raft.Peer{}
for _, p := range config.Peers {
if p.ID == config.ID && p.Address == config.Address {
if p.ID == config.NodeID && p.Address == config.Address {
continue
}
@@ -336,7 +341,7 @@ func New(config Config) (Cluster, error) {
c.raftEmergencyNotifyCh = make(chan bool, 16)
raft, err := raft.New(raft.Config{
ID: config.ID,
ID: config.NodeID,
Path: config.Path,
Address: config.Address,
Peers: peers,
@@ -352,9 +357,9 @@ func New(config Config) (Cluster, error) {
c.raft = raft
if len(config.Peers) != 0 {
for i := 0; i < len(config.Peers); i++ {
peerAddress, err := c.ClusterAPIAddress(config.Peers[i].Address)
if len(peers) != 0 {
for _, p := range peers {
peerAddress, err := c.ClusterAPIAddress(p.Address)
if err != nil {
c.Shutdown()
return nil, err
@@ -369,7 +374,7 @@ func New(config Config) (Cluster, error) {
case <-c.shutdownCh:
return
case <-ticker.C:
err := c.Join("", c.id, c.raftAddress, peerAddress)
err := c.Join("", c.nodeID, c.raftAddress, peerAddress)
if err != nil {
c.logger.Warn().WithError(err).Log("Join cluster")
continue
@@ -666,8 +671,8 @@ func (c *cluster) IsRaftLeader() bool {
}
func (c *cluster) IsDegraded() (bool, error) {
c.stateLock.Lock()
defer c.stateLock.Unlock()
c.stateLock.RLock()
defer c.stateLock.RUnlock()
if c.isDegraded {
return c.isDegraded, c.isDegradedErr
@@ -707,7 +712,7 @@ func (c *cluster) Leave(origin, id string) error {
}
if len(id) == 0 {
id = c.id
id = c.nodeID
}
c.logger.Debug().WithFields(log.Fields{
@@ -762,7 +767,7 @@ func (c *cluster) Leave(origin, id string) error {
numPeers := len(servers)
if id == c.id {
if id == c.nodeID {
// We're going to remove ourselves
if numPeers <= 1 {
// Don't do so if we're the only server in the cluster
@@ -1014,15 +1019,17 @@ func (c *cluster) trackNodeChanges() {
c.nodesLock.Unlock()
// Put the cluster in "degraded" mode in case there's a mismatch in expected values
_, err = c.checkClusterNodes()
hostnames, err := c.checkClusterNodes()
c.stateLock.Lock()
if err != nil {
c.isDegraded = true
c.isDegradedErr = err
c.hostnames = []string{}
} else {
c.isDegraded = false
c.isDegradedErr = nil
c.hostnames = hostnames
}
c.stateLock.Unlock()
@@ -1044,11 +1051,11 @@ func (c *cluster) trackNodeChanges() {
}
}
// checkClusterNodes returns a list of all hostnames configured on all nodes. The
// checkClusterNodes returns a list of hostnames that are configured on all nodes. The
// returned list will not contain any duplicates. An error is returned in case the
// node is not compatible.
func (c *cluster) checkClusterNodes() ([]string, error) {
hostnames := map[string]struct{}{}
hostnames := map[string]int{}
c.nodesLock.RLock()
defer c.nodesLock.RUnlock()
@@ -1082,13 +1089,17 @@ func (c *cluster) checkClusterNodes() ([]string, error) {
}
for _, name := range config.Host.Name {
hostnames[name] = struct{}{}
hostnames[name]++
}
}
names := []string{}
for key := range hostnames {
for key, value := range hostnames {
if value != len(c.nodes) {
continue
}
names = append(names, key)
}
@@ -1177,6 +1188,10 @@ func verifyClusterConfig(local, remote *config.Config) error {
return fmt.Errorf("cluster.enable is different")
}
if local.Cluster.ID != remote.Cluster.ID {
return fmt.Errorf("cluster.id is different")
}
if local.Cluster.SyncInterval != remote.Cluster.SyncInterval {
return fmt.Errorf("cluster.sync_interval_sec is different")
}
@@ -1356,11 +1371,18 @@ type ClusterNodeCore struct {
Latency time.Duration
}
type ClusterAboutLeader struct {
ID string
Address string
ElectedSince time.Duration
}
type ClusterAbout struct {
ID string
Name string
Leader bool
Address string
NodeID string
Domains []string
Leader ClusterAboutLeader
Status string
Raft ClusterRaft
Nodes []ClusterNode
Version ClusterVersion
@@ -1373,15 +1395,22 @@ func (c *cluster) About() (ClusterAbout, error) {
about := ClusterAbout{
ID: c.id,
NodeID: c.nodeID,
Leader: ClusterAboutLeader{},
Status: "online",
Version: Version,
Degraded: degraded,
DegradedErr: degradedErr,
Version: Version,
}
if address, err := c.ClusterAPIAddress(""); err == nil {
about.Address = address
if about.Degraded {
about.Status = "offline"
}
c.stateLock.RLock()
about.Domains = slices.Copy(c.hostnames)
c.stateLock.RUnlock()
stats := c.raft.Stats()
about.Raft.Address = stats.Address
@@ -1400,6 +1429,12 @@ func (c *cluster) About() (ClusterAbout, error) {
for _, s := range servers {
serversMap[s.ID] = s
if s.Leader {
about.Leader.ID = s.ID
about.Leader.Address = s.Address
about.Leader.ElectedSince = s.LastChange
}
}
c.nodesLock.RLock()
@@ -1435,10 +1470,6 @@ func (c *cluster) About() (ClusterAbout, error) {
},
}
if id == c.id {
about.Name = nodeAbout.Name
}
if s, ok := serversMap[id]; ok {
node.Voter = s.Voter
node.Leader = s.Leader

View File

@@ -189,7 +189,7 @@ func (c *cluster) monitorLeadership() {
// leadershipTransfer tries to transfer the leadership to another node e.g. in order
// to do a graceful shutdown.
func (c *cluster) leadershipTransfer(id string) error {
if id == c.id {
if id == c.nodeID {
return nil
}

View File

@@ -178,6 +178,7 @@ type AboutCore struct {
Uptime time.Duration
LastContact time.Duration
Latency time.Duration
Version string
}
type About struct {
@@ -229,6 +230,7 @@ func (n *node) About() About {
}
a.Core.Error = coreError
a.Core.Latency = about.Latency
a.Core.Version = about.Version
a.Resources = about.Resources
return a

View File

@@ -85,6 +85,7 @@ type NodeAbout struct {
LastContact time.Time
Latency time.Duration
Resources NodeResources
Version string
}
type NodeVersion struct {
@@ -517,6 +518,7 @@ func (n *node) About() NodeAbout {
createdAt = time.Now()
}
name := n.peerAbout.Name
version := n.peerAbout.Version.Number
n.peerLock.RUnlock()
n.stateLock.RLock()
@@ -546,6 +548,7 @@ func (n *node) About() NodeAbout {
MemLimit: n.resources.memLimit,
Error: n.resources.err,
},
Version: version,
}
if state == stateDisconnected {

View File

@@ -64,8 +64,9 @@ type raft struct {
leadershipNotifyCh chan bool
leaderObservationCh chan string
isLeader bool
leaderLock sync.Mutex
isLeader bool
lastLeaderObservation time.Time
leaderLock sync.RWMutex
logger log.Logger
}
@@ -76,10 +77,11 @@ type Peer struct {
}
type Server struct {
ID string
Address string
Voter bool
Leader bool
ID string
Address string
Voter bool
Leader bool
LastChange time.Duration
}
type Stats struct {
@@ -160,8 +162,8 @@ func (r *raft) Shutdown() {
}
func (r *raft) IsLeader() bool {
r.leaderLock.Lock()
defer r.leaderLock.Unlock()
r.leaderLock.RLock()
defer r.leaderLock.RUnlock()
return r.isLeader
}
@@ -190,6 +192,12 @@ func (r *raft) Servers() ([]Server, error) {
Leader: string(server.ID) == leaderID,
}
if node.Leader {
r.leaderLock.RLock()
node.LastChange = time.Since(r.lastLeaderObservation)
r.leaderLock.RUnlock()
}
servers = append(servers, node)
}
@@ -541,6 +549,10 @@ func (r *raft) trackLeaderChanges() {
"address": leaderObs.LeaderAddr,
}).Log("New leader observation")
r.leaderLock.Lock()
r.lastLeaderObservation = time.Now()
r.leaderLock.Unlock()
leaderAddress := string(leaderObs.LeaderAddr)
if r.leaderObservationCh != nil {

View File

@@ -37,7 +37,7 @@ func ParseClusterVersion(version string) (ClusterVersion, error) {
// Version of the cluster
var Version = ClusterVersion{
Major: 1,
Major: 2,
Minor: 0,
Patch: 2,
Patch: 0,
}

View File

@@ -292,6 +292,7 @@ func (d *Config) init() {
// Cluster
d.vars.Register(value.NewBool(&d.Cluster.Enable, false), "cluster.enable", "CORE_CLUSTER_ENABLE", nil, "Enable cluster mode", false, false)
d.vars.Register(value.NewString(&d.Cluster.ID, ""), "cluster.id", "CORE_CLUSTER_ID", nil, "Cluster ID", false, false)
d.vars.Register(value.NewFullAddress(&d.Cluster.Address, "127.0.0.1:8000"), "cluster.address", "CORE_CLUSTER_ADDRESS", nil, "Raft listen address", true, false)
d.vars.Register(value.NewClusterPeerList(&d.Cluster.Peers, []string{""}, ","), "cluster.peers", "CORE_CLUSTER_PEERS", nil, "Raft addresses of cores that are part of the cluster", false, false)
d.vars.Register(value.NewInt64(&d.Cluster.StartupTimeout, 300), "cluster.startup_timeout_sec", "CORE_CLUSTER_STARTUP_TIMEOUT_SEC", nil, "Timeout for the cluster startup (leader election, acquiring certificates)", true, false)

View File

@@ -184,6 +184,7 @@ type Data struct {
} `json:"resources"`
Cluster struct {
Enable bool `json:"enable"`
ID string `json:"id"`
Address string `json:"address"` // ip:port
Peers []string `json:"peers"`
StartupTimeout int64 `json:"startup_timeout_sec" format:"int64"` // seconds

View File

@@ -48,16 +48,23 @@ type ClusterRaft struct {
LogIndex uint64 `json:"log_index"`
}
type ClusterAboutLeader struct {
ID string `json:"id"`
Address string `json:"address"`
ElectedSince uint64 `json:"elected_seconds"`
}
type ClusterAbout struct {
ID string `json:"id"`
Name string `json:"name"`
Leader bool `json:"leader"`
Address string `json:"address"`
Raft ClusterRaft `json:"raft"`
Nodes []ClusterNode `json:"nodes"`
Version string `json:"version"`
Degraded bool `json:"degraded"`
DegradedErr string `json:"degraded_error"`
ID string `json:"id"`
NodeID string `json:"node_id"`
Domains []string `json:"public_domains"`
Leader ClusterAboutLeader `json:"leader"`
Status string `json:"status"`
Raft ClusterRaft `json:"raft"`
Nodes []ClusterNode `json:"nodes"`
Version string `json:"version"`
Degraded bool `json:"degraded"`
DegradedErr string `json:"degraded_error"`
}
type ClusterNodeFiles struct {

View File

@@ -58,9 +58,14 @@ func (h *ClusterHandler) About(c echo.Context) error {
about := api.ClusterAbout{
ID: state.ID,
Name: state.Name,
Leader: state.Leader,
Address: state.Address,
NodeID: state.NodeID,
Domains: state.Domains,
Leader: api.ClusterAboutLeader{
ID: state.Leader.ID,
Address: state.Leader.Address,
ElectedSince: uint64(state.Leader.ElectedSince.Seconds()),
},
Status: state.Status,
Raft: api.ClusterRaft{
Address: state.Raft.Address,
State: state.Raft.State,