diff --git a/app/api/api.go b/app/api/api.go index d6b54eff..b3021f74 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -530,7 +530,8 @@ func (a *api) start(ctx context.Context) error { } cluster, err := cluster.New(cluster.Config{ - ID: cfg.ID, + ID: cfg.Cluster.ID, + NodeID: cfg.ID, Name: cfg.Name, Path: filepath.Join(cfg.DB.Dir, "cluster"), Address: cfg.Cluster.Address, diff --git a/cluster/cluster.go b/cluster/cluster.go index 0d2c1a3f..cd6042d9 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -30,6 +30,7 @@ import ( "github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/net" "github.com/datarhei/core/v16/restream/app" + "github.com/datarhei/core/v16/slices" ) type Cluster interface { @@ -102,7 +103,8 @@ type DebugConfig struct { } type Config struct { - ID string // ID of the node + ID string // ID of the cluster + NodeID string // ID of the node Name string // Name of the node Path string // Path where to store all cluster data Address string // Listen address for the raft protocol @@ -122,9 +124,10 @@ type Config struct { } type cluster struct { - id string - name string - path string + id string + nodeID string + name string + path string logger log.Logger @@ -159,7 +162,8 @@ type cluster struct { isDegradedErr error isCoreDegraded bool isCoreDegradedErr error - stateLock sync.Mutex + hostnames []string + stateLock sync.RWMutex isRaftLeader bool hasRaftLeader bool @@ -186,6 +190,7 @@ var ErrDegraded = errors.New("cluster is currently degraded") func New(config Config) (Cluster, error) { c := &cluster{ id: config.ID, + nodeID: config.NodeID, name: config.Name, path: config.Path, logger: config.Logger, @@ -277,7 +282,7 @@ func New(config Config) (Cluster, error) { c.store = store api, err := NewAPI(APIConfig{ - ID: c.id, + ID: c.nodeID, Cluster: c, Logger: c.logger.WithField("logname", "api"), }) @@ -292,7 +297,7 @@ func New(config Config) (Cluster, error) { c.api = api nodeproxy, err := proxy.NewProxy(proxy.ProxyConfig{ - ID: c.id, + ID: c.nodeID, Logger: c.logger.WithField("logname", "proxy"), }) if err != nil { @@ -307,7 +312,7 @@ func New(config Config) (Cluster, error) { c.proxy = nodeproxy if forwarder, err := forwarder.New(forwarder.ForwarderConfig{ - ID: c.id, + ID: c.nodeID, Logger: c.logger.WithField("logname", "forwarder"), }); err != nil { c.Shutdown() @@ -321,7 +326,7 @@ func New(config Config) (Cluster, error) { peers := []raft.Peer{} for _, p := range config.Peers { - if p.ID == config.ID && p.Address == config.Address { + if p.ID == config.NodeID && p.Address == config.Address { continue } @@ -336,7 +341,7 @@ func New(config Config) (Cluster, error) { c.raftEmergencyNotifyCh = make(chan bool, 16) raft, err := raft.New(raft.Config{ - ID: config.ID, + ID: config.NodeID, Path: config.Path, Address: config.Address, Peers: peers, @@ -352,9 +357,9 @@ func New(config Config) (Cluster, error) { c.raft = raft - if len(config.Peers) != 0 { - for i := 0; i < len(config.Peers); i++ { - peerAddress, err := c.ClusterAPIAddress(config.Peers[i].Address) + if len(peers) != 0 { + for _, p := range peers { + peerAddress, err := c.ClusterAPIAddress(p.Address) if err != nil { c.Shutdown() return nil, err @@ -369,7 +374,7 @@ func New(config Config) (Cluster, error) { case <-c.shutdownCh: return case <-ticker.C: - err := c.Join("", c.id, c.raftAddress, peerAddress) + err := c.Join("", c.nodeID, c.raftAddress, peerAddress) if err != nil { c.logger.Warn().WithError(err).Log("Join cluster") continue @@ -666,8 +671,8 @@ func (c *cluster) IsRaftLeader() bool { } func (c *cluster) IsDegraded() (bool, error) { - c.stateLock.Lock() - defer c.stateLock.Unlock() + c.stateLock.RLock() + defer c.stateLock.RUnlock() if c.isDegraded { return c.isDegraded, c.isDegradedErr @@ -707,7 +712,7 @@ func (c *cluster) Leave(origin, id string) error { } if len(id) == 0 { - id = c.id + id = c.nodeID } c.logger.Debug().WithFields(log.Fields{ @@ -762,7 +767,7 @@ func (c *cluster) Leave(origin, id string) error { numPeers := len(servers) - if id == c.id { + if id == c.nodeID { // We're going to remove ourselves if numPeers <= 1 { // Don't do so if we're the only server in the cluster @@ -1014,15 +1019,17 @@ func (c *cluster) trackNodeChanges() { c.nodesLock.Unlock() // Put the cluster in "degraded" mode in case there's a mismatch in expected values - _, err = c.checkClusterNodes() + hostnames, err := c.checkClusterNodes() c.stateLock.Lock() if err != nil { c.isDegraded = true c.isDegradedErr = err + c.hostnames = []string{} } else { c.isDegraded = false c.isDegradedErr = nil + c.hostnames = hostnames } c.stateLock.Unlock() @@ -1044,11 +1051,11 @@ func (c *cluster) trackNodeChanges() { } } -// checkClusterNodes returns a list of all hostnames configured on all nodes. The +// checkClusterNodes returns a list of hostnames that are configured on all nodes. The // returned list will not contain any duplicates. An error is returned in case the // node is not compatible. func (c *cluster) checkClusterNodes() ([]string, error) { - hostnames := map[string]struct{}{} + hostnames := map[string]int{} c.nodesLock.RLock() defer c.nodesLock.RUnlock() @@ -1082,13 +1089,17 @@ func (c *cluster) checkClusterNodes() ([]string, error) { } for _, name := range config.Host.Name { - hostnames[name] = struct{}{} + hostnames[name]++ } } names := []string{} - for key := range hostnames { + for key, value := range hostnames { + if value != len(c.nodes) { + continue + } + names = append(names, key) } @@ -1177,6 +1188,10 @@ func verifyClusterConfig(local, remote *config.Config) error { return fmt.Errorf("cluster.enable is different") } + if local.Cluster.ID != remote.Cluster.ID { + return fmt.Errorf("cluster.id is different") + } + if local.Cluster.SyncInterval != remote.Cluster.SyncInterval { return fmt.Errorf("cluster.sync_interval_sec is different") } @@ -1356,11 +1371,18 @@ type ClusterNodeCore struct { Latency time.Duration } +type ClusterAboutLeader struct { + ID string + Address string + ElectedSince time.Duration +} + type ClusterAbout struct { ID string - Name string - Leader bool - Address string + NodeID string + Domains []string + Leader ClusterAboutLeader + Status string Raft ClusterRaft Nodes []ClusterNode Version ClusterVersion @@ -1373,15 +1395,22 @@ func (c *cluster) About() (ClusterAbout, error) { about := ClusterAbout{ ID: c.id, + NodeID: c.nodeID, + Leader: ClusterAboutLeader{}, + Status: "online", + Version: Version, Degraded: degraded, DegradedErr: degradedErr, - Version: Version, } - if address, err := c.ClusterAPIAddress(""); err == nil { - about.Address = address + if about.Degraded { + about.Status = "offline" } + c.stateLock.RLock() + about.Domains = slices.Copy(c.hostnames) + c.stateLock.RUnlock() + stats := c.raft.Stats() about.Raft.Address = stats.Address @@ -1400,6 +1429,12 @@ func (c *cluster) About() (ClusterAbout, error) { for _, s := range servers { serversMap[s.ID] = s + + if s.Leader { + about.Leader.ID = s.ID + about.Leader.Address = s.Address + about.Leader.ElectedSince = s.LastChange + } } c.nodesLock.RLock() @@ -1435,10 +1470,6 @@ func (c *cluster) About() (ClusterAbout, error) { }, } - if id == c.id { - about.Name = nodeAbout.Name - } - if s, ok := serversMap[id]; ok { node.Voter = s.Voter node.Leader = s.Leader diff --git a/cluster/leader.go b/cluster/leader.go index 79cf1819..14cd2d33 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -189,7 +189,7 @@ func (c *cluster) monitorLeadership() { // leadershipTransfer tries to transfer the leadership to another node e.g. in order // to do a graceful shutdown. func (c *cluster) leadershipTransfer(id string) error { - if id == c.id { + if id == c.nodeID { return nil } diff --git a/cluster/node/node.go b/cluster/node/node.go index dca02a2f..a361edbe 100644 --- a/cluster/node/node.go +++ b/cluster/node/node.go @@ -178,6 +178,7 @@ type AboutCore struct { Uptime time.Duration LastContact time.Duration Latency time.Duration + Version string } type About struct { @@ -229,6 +230,7 @@ func (n *node) About() About { } a.Core.Error = coreError a.Core.Latency = about.Latency + a.Core.Version = about.Version a.Resources = about.Resources return a diff --git a/cluster/proxy/node.go b/cluster/proxy/node.go index 60a2a71d..72891348 100644 --- a/cluster/proxy/node.go +++ b/cluster/proxy/node.go @@ -85,6 +85,7 @@ type NodeAbout struct { LastContact time.Time Latency time.Duration Resources NodeResources + Version string } type NodeVersion struct { @@ -517,6 +518,7 @@ func (n *node) About() NodeAbout { createdAt = time.Now() } name := n.peerAbout.Name + version := n.peerAbout.Version.Number n.peerLock.RUnlock() n.stateLock.RLock() @@ -546,6 +548,7 @@ func (n *node) About() NodeAbout { MemLimit: n.resources.memLimit, Error: n.resources.err, }, + Version: version, } if state == stateDisconnected { diff --git a/cluster/raft/raft.go b/cluster/raft/raft.go index f92bcb98..5738e6c0 100644 --- a/cluster/raft/raft.go +++ b/cluster/raft/raft.go @@ -64,8 +64,9 @@ type raft struct { leadershipNotifyCh chan bool leaderObservationCh chan string - isLeader bool - leaderLock sync.Mutex + isLeader bool + lastLeaderObservation time.Time + leaderLock sync.RWMutex logger log.Logger } @@ -76,10 +77,11 @@ type Peer struct { } type Server struct { - ID string - Address string - Voter bool - Leader bool + ID string + Address string + Voter bool + Leader bool + LastChange time.Duration } type Stats struct { @@ -160,8 +162,8 @@ func (r *raft) Shutdown() { } func (r *raft) IsLeader() bool { - r.leaderLock.Lock() - defer r.leaderLock.Unlock() + r.leaderLock.RLock() + defer r.leaderLock.RUnlock() return r.isLeader } @@ -190,6 +192,12 @@ func (r *raft) Servers() ([]Server, error) { Leader: string(server.ID) == leaderID, } + if node.Leader { + r.leaderLock.RLock() + node.LastChange = time.Since(r.lastLeaderObservation) + r.leaderLock.RUnlock() + } + servers = append(servers, node) } @@ -541,6 +549,10 @@ func (r *raft) trackLeaderChanges() { "address": leaderObs.LeaderAddr, }).Log("New leader observation") + r.leaderLock.Lock() + r.lastLeaderObservation = time.Now() + r.leaderLock.Unlock() + leaderAddress := string(leaderObs.LeaderAddr) if r.leaderObservationCh != nil { diff --git a/cluster/version.go b/cluster/version.go index b3dba8a1..0748b0b2 100644 --- a/cluster/version.go +++ b/cluster/version.go @@ -37,7 +37,7 @@ func ParseClusterVersion(version string) (ClusterVersion, error) { // Version of the cluster var Version = ClusterVersion{ - Major: 1, + Major: 2, Minor: 0, - Patch: 2, + Patch: 0, } diff --git a/config/config.go b/config/config.go index 289ed0cd..317da336 100644 --- a/config/config.go +++ b/config/config.go @@ -292,6 +292,7 @@ func (d *Config) init() { // Cluster d.vars.Register(value.NewBool(&d.Cluster.Enable, false), "cluster.enable", "CORE_CLUSTER_ENABLE", nil, "Enable cluster mode", false, false) + d.vars.Register(value.NewString(&d.Cluster.ID, ""), "cluster.id", "CORE_CLUSTER_ID", nil, "Cluster ID", false, false) d.vars.Register(value.NewFullAddress(&d.Cluster.Address, "127.0.0.1:8000"), "cluster.address", "CORE_CLUSTER_ADDRESS", nil, "Raft listen address", true, false) d.vars.Register(value.NewClusterPeerList(&d.Cluster.Peers, []string{""}, ","), "cluster.peers", "CORE_CLUSTER_PEERS", nil, "Raft addresses of cores that are part of the cluster", false, false) d.vars.Register(value.NewInt64(&d.Cluster.StartupTimeout, 300), "cluster.startup_timeout_sec", "CORE_CLUSTER_STARTUP_TIMEOUT_SEC", nil, "Timeout for the cluster startup (leader election, acquiring certificates)", true, false) diff --git a/config/data.go b/config/data.go index f7352030..887c8e98 100644 --- a/config/data.go +++ b/config/data.go @@ -184,6 +184,7 @@ type Data struct { } `json:"resources"` Cluster struct { Enable bool `json:"enable"` + ID string `json:"id"` Address string `json:"address"` // ip:port Peers []string `json:"peers"` StartupTimeout int64 `json:"startup_timeout_sec" format:"int64"` // seconds diff --git a/http/api/cluster.go b/http/api/cluster.go index 8cdc878c..e34ce53a 100644 --- a/http/api/cluster.go +++ b/http/api/cluster.go @@ -48,16 +48,23 @@ type ClusterRaft struct { LogIndex uint64 `json:"log_index"` } +type ClusterAboutLeader struct { + ID string `json:"id"` + Address string `json:"address"` + ElectedSince uint64 `json:"elected_seconds"` +} + type ClusterAbout struct { - ID string `json:"id"` - Name string `json:"name"` - Leader bool `json:"leader"` - Address string `json:"address"` - Raft ClusterRaft `json:"raft"` - Nodes []ClusterNode `json:"nodes"` - Version string `json:"version"` - Degraded bool `json:"degraded"` - DegradedErr string `json:"degraded_error"` + ID string `json:"id"` + NodeID string `json:"node_id"` + Domains []string `json:"public_domains"` + Leader ClusterAboutLeader `json:"leader"` + Status string `json:"status"` + Raft ClusterRaft `json:"raft"` + Nodes []ClusterNode `json:"nodes"` + Version string `json:"version"` + Degraded bool `json:"degraded"` + DegradedErr string `json:"degraded_error"` } type ClusterNodeFiles struct { diff --git a/http/handler/api/cluster.go b/http/handler/api/cluster.go index 133ad6a2..a1fb5730 100644 --- a/http/handler/api/cluster.go +++ b/http/handler/api/cluster.go @@ -58,9 +58,14 @@ func (h *ClusterHandler) About(c echo.Context) error { about := api.ClusterAbout{ ID: state.ID, - Name: state.Name, - Leader: state.Leader, - Address: state.Address, + NodeID: state.NodeID, + Domains: state.Domains, + Leader: api.ClusterAboutLeader{ + ID: state.Leader.ID, + Address: state.Leader.Address, + ElectedSince: uint64(state.Leader.ElectedSince.Seconds()), + }, + Status: state.Status, Raft: api.ClusterRaft{ Address: state.Raft.Address, State: state.Raft.State,