diff --git a/cluster/api.go b/cluster/api.go index 8654a443..86d20570 100644 --- a/cluster/api.go +++ b/cluster/api.go @@ -95,6 +95,14 @@ func NewAPI(config APIConfig) (API, error) { doc := a.router.Group("/v1/swagger/*") doc.GET("", echoSwagger.EchoWrapHandler(echoSwagger.InstanceName("ClusterAPI"))) + a.router.GET("/", func(c echo.Context) error { + return c.JSON(http.StatusOK, Version.String()) + }) + + a.router.GET("/v1/about", func(c echo.Context) error { + return c.JSON(http.StatusOK, Version.String()) + }) + a.router.POST("/v1/server", a.AddServer) a.router.DELETE("/v1/server/:id", a.RemoveServer) diff --git a/cluster/client/client.go b/cluster/client/client.go index 4d15a1e1..08a34ed2 100644 --- a/cluster/client/client.go +++ b/cluster/client/client.go @@ -48,8 +48,23 @@ type APIClient struct { Client *http.Client } +func (c *APIClient) Version() (string, error) { + data, err := c.call(http.MethodGet, "/", "", nil, "") + if err != nil { + return "", err + } + + var version string + err = json.Unmarshal(data, &version) + if err != nil { + return "", err + } + + return version, nil +} + func (c *APIClient) CoreAPIAddress() (string, error) { - data, err := c.call(http.MethodGet, "/core", "", nil, "") + data, err := c.call(http.MethodGet, "/v1/core", "", nil, "") if err != nil { return "", err } @@ -69,13 +84,13 @@ func (c *APIClient) Join(origin string, r JoinRequest) error { return err } - _, err = c.call(http.MethodPost, "/server", "application/json", bytes.NewReader(data), origin) + _, err = c.call(http.MethodPost, "/v1/server", "application/json", bytes.NewReader(data), origin) return err } func (c *APIClient) Leave(origin string, id string) error { - _, err := c.call(http.MethodDelete, "/server/"+id, "application/json", nil, origin) + _, err := c.call(http.MethodDelete, "/v1/server/"+id, "application/json", nil, origin) return err } @@ -86,13 +101,13 @@ func (c *APIClient) AddProcess(origin string, r AddProcessRequest) error { return err } - _, err = c.call(http.MethodPost, "/process", "application/json", bytes.NewReader(data), origin) + _, err = c.call(http.MethodPost, "/v1/process", "application/json", bytes.NewReader(data), origin) return err } func (c *APIClient) RemoveProcess(origin string, id app.ProcessID) error { - _, err := c.call(http.MethodDelete, "/process/"+id.ID+"?domain="+id.Domain, "application/json", nil, origin) + _, err := c.call(http.MethodDelete, "/v1/process/"+id.ID+"?domain="+id.Domain, "application/json", nil, origin) return err } @@ -103,7 +118,7 @@ func (c *APIClient) UpdateProcess(origin string, id app.ProcessID, r UpdateProce return err } - _, err = c.call(http.MethodPut, "/process/"+id.ID+"?domain="+id.Domain, "application/json", bytes.NewReader(data), origin) + _, err = c.call(http.MethodPut, "/v1/process/"+id.ID+"?domain="+id.Domain, "application/json", bytes.NewReader(data), origin) return err } @@ -114,7 +129,7 @@ func (c *APIClient) SetProcessMetadata(origin string, id app.ProcessID, key stri return err } - _, err = c.call(http.MethodPut, "/process/"+id.ID+"/metadata/"+key+"?domain="+id.Domain, "application/json", bytes.NewReader(data), origin) + _, err = c.call(http.MethodPut, "/v1/process/"+id.ID+"/metadata/"+key+"?domain="+id.Domain, "application/json", bytes.NewReader(data), origin) return err } @@ -125,7 +140,7 @@ func (c *APIClient) AddIdentity(origin string, r AddIdentityRequest) error { return err } - _, err = c.call(http.MethodPost, "/iam/user", "application/json", bytes.NewReader(data), origin) + _, err = c.call(http.MethodPost, "/v1/iam/user", "application/json", bytes.NewReader(data), origin) return err } @@ -136,7 +151,7 @@ func (c *APIClient) UpdateIdentity(origin, name string, r UpdateIdentityRequest) return err } - _, err = c.call(http.MethodPut, "/iam/user/"+name, "application/json", bytes.NewReader(data), origin) + _, err = c.call(http.MethodPut, "/v1/iam/user/"+name, "application/json", bytes.NewReader(data), origin) return err } @@ -147,19 +162,19 @@ func (c *APIClient) SetPolicies(origin, name string, r SetPoliciesRequest) error return err } - _, err = c.call(http.MethodPut, "/iam/user/"+name+"/policies", "application/json", bytes.NewReader(data), origin) + _, err = c.call(http.MethodPut, "/v1/iam/user/"+name+"/policies", "application/json", bytes.NewReader(data), origin) return err } func (c *APIClient) RemoveIdentity(origin string, name string) error { - _, err := c.call(http.MethodDelete, "/iam/user/"+name, "application/json", nil, origin) + _, err := c.call(http.MethodDelete, "/v1/iam/user/"+name, "application/json", nil, origin) return err } func (c *APIClient) Snapshot() (io.ReadCloser, error) { - return c.stream(http.MethodGet, "/snapshot", "", nil, "") + return c.stream(http.MethodGet, "/v1/snapshot", "", nil, "") } func (c *APIClient) stream(method, path, contentType string, data io.Reader, origin string) (io.ReadCloser, error) { @@ -167,7 +182,7 @@ func (c *APIClient) stream(method, path, contentType string, data io.Reader, ori return nil, fmt.Errorf("no address defined") } - address := "http://" + c.Address + "/v1" + path + address := "http://" + c.Address + path req, err := http.NewRequest(method, address, data) if err != nil { diff --git a/cluster/cluster.go b/cluster/cluster.go index 604065cb..66f5a0e3 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -3,9 +3,11 @@ package cluster import ( "context" "encoding/json" + "errors" "fmt" "io" gonet "net" + "net/http" "net/url" "strconv" "sync" @@ -124,6 +126,9 @@ type cluster struct { coreAddress string + isDegraded bool + stateLock sync.Mutex + isRaftLeader bool hasRaftLeader bool isLeader bool @@ -133,6 +138,8 @@ type cluster struct { nodesLock sync.RWMutex } +var ErrDegraded = errors.New("cluster is currently degraded") + func New(config ClusterConfig) (Cluster, error) { c := &cluster{ id: config.ID, @@ -301,7 +308,14 @@ func (c *cluster) ClusterAPIAddress(raftAddress string) (string, error) { raftAddress = c.Address() } - host, port, _ := gonet.SplitHostPort(raftAddress) + return clusterAPIAddress(raftAddress) +} + +func clusterAPIAddress(raftAddress string) (string, error) { + host, port, err := gonet.SplitHostPort(raftAddress) + if err != nil { + return "", err + } p, err := strconv.Atoi(port) if err != nil { @@ -379,6 +393,13 @@ func (c *cluster) IsRaftLeader() bool { return c.isRaftLeader } +func (c *cluster) IsDegraded() bool { + c.stateLock.Lock() + defer c.stateLock.Unlock() + + return c.isDegraded +} + func (c *cluster) Leave(origin, id string) error { if len(id) == 0 { id = c.id @@ -524,19 +545,6 @@ func (c *cluster) Join(origin, id, raftAddress, peerAddress string) error { "address": raftAddress, }).Log("Received join request for remote server") - // connect to the peer's API in order to find out if our version is compatible - address, err := c.CoreAPIAddress(raftAddress) - if err != nil { - return fmt.Errorf("peer API doesn't respond: %w", err) - } - - node := proxy.NewNode(address) - err = node.Connect() - if err != nil { - return fmt.Errorf("couldn't connect to peer: %w", err) - } - defer node.Disconnect() - servers, err := c.raft.Servers() if err != nil { c.logger.Error().WithError(err).Log("Raft configuration") @@ -620,30 +628,44 @@ func (c *cluster) trackNodeChanges() { _, ok := c.nodes[id] if !ok { - address, err := c.CoreAPIAddress(server.Address) + logger := c.logger.WithFields(log.Fields{ + "id": server.ID, + "address": server.Address, + }) + + address, err := c.ClusterAPIAddress(server.Address) if err != nil { - c.logger.Warn().WithError(err).WithFields(log.Fields{ - "id": id, - "address": server.Address, - }).Log("Discovering core API address") + logger.Warn().WithError(err).Log("Discovering cluster API address") continue } - node := proxy.NewNode(address) + if !checkClusterVersion(address) { + logger.Warn().Log("Version mismatch. Cluster will end up in degraded mode") + } + + client := apiclient.APIClient{ + Address: address, + } + + coreAddress, err := client.CoreAPIAddress() + if err != nil { + logger.Warn().WithError(err).Log("Retrieve core API address") + continue + } + + node := proxy.NewNode(coreAddress) err = node.Connect() if err != nil { - c.logger.Warn().WithError(err).WithFields(log.Fields{ - "id": id, - "address": server.Address, - }).Log("Connecting to core API") + c.logger.Warn().WithError(err).Log("Connecting to core API") continue } + // TODO: Check constraints + if _, err := c.proxy.AddNode(id, node); err != nil { - c.logger.Warn().WithError(err).WithFields(log.Fields{ - "id": id, - "address": address, - }).Log("Adding node") + c.logger.Warn().WithError(err).Log("Adding node") + node.Disconnect() + continue } c.nodes[id] = node @@ -658,18 +680,94 @@ func (c *cluster) trackNodeChanges() { continue } - node.Disconnect() c.proxy.RemoveNode(id) + node.Disconnect() delete(c.nodes, id) } c.nodesLock.Unlock() + + // Put the cluster in "degraded" mode in case there's a version mismatch + isDegraded := !c.checkClusterVersions(servers) + + c.stateLock.Lock() + c.isDegraded = isDegraded + c.stateLock.Unlock() case <-c.shutdownCh: return } } } +func (c *cluster) checkClusterVersions(servers []raft.Server) bool { + ok := true + okChan := make(chan bool, 64) + + wgSummary := sync.WaitGroup{} + wgSummary.Add(1) + + go func() { + defer wgSummary.Done() + + for okServer := range okChan { + if !okServer { + ok = false + } + } + }() + + wg := sync.WaitGroup{} + + for _, server := range servers { + wg.Add(1) + + go func(server raft.Server, p chan<- bool) { + defer wg.Done() + + address, err := clusterAPIAddress(server.Address) + if err != nil { + p <- false + return + } + + p <- checkClusterVersion(address) + }(server, okChan) + } + + wg.Wait() + + close(okChan) + + wgSummary.Wait() + + return ok +} + +func checkClusterVersion(address string) bool { + client := apiclient.APIClient{ + Address: address, + Client: &http.Client{ + Timeout: 5 * time.Second, + }, + } + + v, err := client.Version() + if err != nil { + return false + } + + version, err := ParseClusterVersion(v) + if err != nil { + return false + } + + if !Version.Equal(version) { + return false + } + + return true +} + // trackLeaderChanges registers an Observer with raft in order to receive updates // about leader changes, in order to keep the forwarder up to date. func (c *cluster) trackLeaderChanges() { @@ -705,6 +803,10 @@ func (c *cluster) GetProcess(id app.ProcessID) (store.Process, error) { } func (c *cluster) AddProcess(origin string, config *app.Config) error { + if c.IsDegraded() { + return ErrDegraded + } + if !c.IsRaftLeader() { return c.forwarder.AddProcess(origin, config) } @@ -720,6 +822,10 @@ func (c *cluster) AddProcess(origin string, config *app.Config) error { } func (c *cluster) RemoveProcess(origin string, id app.ProcessID) error { + if c.IsDegraded() { + return ErrDegraded + } + if !c.IsRaftLeader() { return c.forwarder.RemoveProcess(origin, id) } @@ -735,6 +841,10 @@ func (c *cluster) RemoveProcess(origin string, id app.ProcessID) error { } func (c *cluster) UpdateProcess(origin string, id app.ProcessID, config *app.Config) error { + if c.IsDegraded() { + return ErrDegraded + } + if !c.IsRaftLeader() { return c.forwarder.UpdateProcess(origin, id, config) } @@ -751,6 +861,10 @@ func (c *cluster) UpdateProcess(origin string, id app.ProcessID, config *app.Con } func (c *cluster) SetProcessMetadata(origin string, id app.ProcessID, key string, data interface{}) error { + if c.IsDegraded() { + return ErrDegraded + } + if !c.IsRaftLeader() { return c.forwarder.SetProcessMetadata(origin, id, key, data) } @@ -784,7 +898,7 @@ func (c *cluster) IAM(superuser iamidentity.User, jwtRealm, jwtSecret string) (i Superuser: superuser, JWTRealm: jwtRealm, JWTSecret: jwtSecret, - Logger: c.logger.WithField("logname", "iam"), + Logger: c.logger.WithComponent("IAM"), }, c.store) if err != nil { return nil, fmt.Errorf("cluster iam: %w", err) @@ -822,6 +936,10 @@ func (c *cluster) ListUserPolicies(name string) (time.Time, []iamaccess.Policy) } func (c *cluster) AddIdentity(origin string, identity iamidentity.User) error { + if c.IsDegraded() { + return ErrDegraded + } + if !c.IsRaftLeader() { return c.forwarder.AddIdentity(origin, identity) } @@ -837,6 +955,10 @@ func (c *cluster) AddIdentity(origin string, identity iamidentity.User) error { } func (c *cluster) UpdateIdentity(origin, name string, identity iamidentity.User) error { + if c.IsDegraded() { + return ErrDegraded + } + if !c.IsRaftLeader() { return c.forwarder.UpdateIdentity(origin, name, identity) } @@ -853,6 +975,10 @@ func (c *cluster) UpdateIdentity(origin, name string, identity iamidentity.User) } func (c *cluster) SetPolicies(origin, name string, policies []iamaccess.Policy) error { + if c.IsDegraded() { + return ErrDegraded + } + if !c.IsRaftLeader() { return c.forwarder.SetPolicies(origin, name, policies) } @@ -869,6 +995,10 @@ func (c *cluster) SetPolicies(origin, name string, policies []iamaccess.Policy) } func (c *cluster) RemoveIdentity(origin string, name string) error { + if c.IsDegraded() { + return ErrDegraded + } + if !c.IsRaftLeader() { return c.forwarder.RemoveIdentity(origin, name) } @@ -897,32 +1027,42 @@ func (c *cluster) applyCommand(cmd *store.Command) error { return nil } -type ClusterServer struct { +type ClusterRaftServer struct { ID string Address string Voter bool Leader bool } -type ClusterStats struct { +type ClusterRaftStats struct { State string LastContact time.Duration NumPeers uint64 } +type ClusterRaft struct { + Server []ClusterRaftServer + Stats ClusterRaftStats +} + type ClusterAbout struct { ID string Address string ClusterAPIAddress string CoreAPIAddress string - Nodes []ClusterServer - Stats ClusterStats + Raft ClusterRaft + Nodes []proxy.NodeAbout + Version ClusterVersion + Degraded bool } func (c *cluster) About() (ClusterAbout, error) { + degraded := c.IsDegraded() + about := ClusterAbout{ - ID: c.id, - Address: c.Address(), + ID: c.id, + Address: c.Address(), + Degraded: degraded, } if address, err := c.ClusterAPIAddress(""); err == nil { @@ -935,9 +1075,9 @@ func (c *cluster) About() (ClusterAbout, error) { stats := c.raft.Stats() - about.Stats.State = stats.State - about.Stats.LastContact = stats.LastContact - about.Stats.NumPeers = stats.NumPeers + about.Raft.Stats.State = stats.State + about.Raft.Stats.LastContact = stats.LastContact + about.Raft.Stats.NumPeers = stats.NumPeers servers, err := c.raft.Servers() if err != nil { @@ -946,14 +1086,21 @@ func (c *cluster) About() (ClusterAbout, error) { } for _, server := range servers { - node := ClusterServer{ + node := ClusterRaftServer{ ID: server.ID, Address: server.Address, Voter: server.Voter, Leader: server.Leader, } - about.Nodes = append(about.Nodes, node) + about.Raft.Server = append(about.Raft.Server, node) + } + + about.Version = Version + + nodes := c.ProxyReader().ListNodes() + for _, node := range nodes { + about.Nodes = append(about.Nodes, node.About()) } return about, nil diff --git a/cluster/leader.go b/cluster/leader.go index 6fd32ca2..ac6747c1 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -333,6 +333,10 @@ func (c *cluster) startSynchronizeAndRebalance(ctx context.Context, interval tim case <-ctx.Done(): return case <-ticker.C: + if c.IsDegraded() { + break + } + c.doSynchronize(emergency) if !emergency { @@ -551,7 +555,7 @@ func (c *cluster) doRebalance(emergency bool) { c.logger.Debug().WithFields(log.Fields{ "have": have, - "nodes": nodes, + "nodes": nodesMap, }).Log("Rebalance") opStack, _ := rebalance(have, nodesMap) diff --git a/cluster/node.go b/cluster/node.go new file mode 100644 index 00000000..9d1c7b06 --- /dev/null +++ b/cluster/node.go @@ -0,0 +1,116 @@ +package cluster + +import ( + "context" + "net/http" + "sync" + "time" + + "github.com/datarhei/core/v16/cluster/client" + "github.com/datarhei/core/v16/cluster/proxy" +) + +type clusterNode struct { + client client.APIClient + + version string + lastContact time.Time + latency time.Duration + pingLock sync.RWMutex + + runLock sync.Mutex + cancelPing context.CancelFunc + + proxyNode proxy.Node +} + +func NewClusterNode(address string) *clusterNode { + n := &clusterNode{ + version: "0.0.0", + client: client.APIClient{ + Address: address, + Client: &http.Client{ + Timeout: 5 * time.Second, + }, + }, + } + + return n +} + +func (n *clusterNode) Start() error { + n.runLock.Lock() + defer n.runLock.Unlock() + + if n.cancelPing != nil { + return nil + } + + version, err := n.client.Version() + if err != nil { + return err + } + + n.version = version + + address, err := n.CoreAPIAddress() + if err != nil { + return err + } + + n.proxyNode = proxy.NewNode(address) + + ctx, cancel := context.WithCancel(context.Background()) + n.cancelPing = cancel + + go n.ping(ctx) + + return nil +} + +func (n *clusterNode) Stop() error { + n.runLock.Lock() + defer n.runLock.Unlock() + + if n.cancelPing == nil { + return nil + } + + n.cancelPing() + n.cancelPing = nil + + return nil +} + +func (n *clusterNode) Version() string { + n.pingLock.RLock() + defer n.pingLock.RUnlock() + + return n.version +} + +func (n *clusterNode) CoreAPIAddress() (string, error) { + return n.client.CoreAPIAddress() +} + +func (n *clusterNode) ping(ctx context.Context) { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + start := time.Now() + version, err := n.client.Version() + if err == nil { + n.pingLock.Lock() + n.version = version + n.lastContact = time.Now() + n.latency = time.Since(start) + n.pingLock.Unlock() + } + case <-ctx.Done(): + return + } + } +} diff --git a/cluster/proxy/node.go b/cluster/proxy/node.go index 2c492708..c77a1a9b 100644 --- a/cluster/proxy/node.go +++ b/cluster/proxy/node.go @@ -167,8 +167,7 @@ func (n *node) Connect() error { } peer, err := client.New(client.Config{ - Address: n.address, - Auth0Token: "", + Address: n.address, Client: &http.Client{ Timeout: 5 * time.Second, }, @@ -566,25 +565,20 @@ func (n *node) files() { filesList := []string{} errorList := []error{} - ctx, cancel := context.WithCancel(context.Background()) - wgList := sync.WaitGroup{} wgList.Add(1) - go func(ctx context.Context) { + go func() { defer wgList.Done() - for { - select { - case <-ctx.Done(): - return - case file := <-filesChan: - filesList = append(filesList, file) - case err := <-errorsChan: - errorList = append(errorList, err) - } + for file := range filesChan { + filesList = append(filesList, file) } - }(ctx) + + for err := range errorsChan { + errorList = append(errorList, err) + } + }() wg := sync.WaitGroup{} wg.Add(2) @@ -687,7 +681,8 @@ func (n *node) files() { wg.Wait() - cancel() + close(filesChan) + close(errorsChan) wgList.Wait() diff --git a/cluster/store/store.go b/cluster/store/store.go index 7d0bb801..ab46f279 100644 --- a/cluster/store/store.go +++ b/cluster/store/store.go @@ -107,13 +107,8 @@ type CommandSetProcessNodeMap struct { Map map[string]string } -// Implement a FSM -type store struct { - lock sync.RWMutex - callback func(op Operation) - - logger log.Logger - +type storeData struct { + Version uint64 Process map[string]Process ProcessNodeMap map[string]string @@ -128,24 +123,43 @@ type store struct { } } +func (s *storeData) init() { + now := time.Now() + + s.Version = 1 + s.Process = map[string]Process{} + s.ProcessNodeMap = map[string]string{} + s.Users.UpdatedAt = now + s.Users.Users = map[string]identity.User{} + s.Policies.UpdatedAt = now + s.Policies.Policies = map[string][]access.Policy{} +} + +// store implements a raft.FSM +type store struct { + lock sync.RWMutex + callback func(op Operation) + + logger log.Logger + + data storeData +} + type Config struct { Logger log.Logger } func NewStore(config Config) (Store, error) { s := &store{ - Process: map[string]Process{}, - ProcessNodeMap: map[string]string{}, - logger: config.Logger, + logger: config.Logger, } - s.Users.Users = map[string]identity.User{} - s.Policies.Policies = map[string][]access.Policy{} - if s.logger == nil { s.logger = log.New("") } + s.data.init() + return s, nil } @@ -314,13 +328,13 @@ func (s *store) addProcess(cmd CommandAddProcess) error { return fmt.Errorf("the process with the ID '%s' must have limits defined", id) } - _, ok := s.Process[id] + _, ok := s.data.Process[id] if ok { return fmt.Errorf("the process with the ID '%s' already exists", id) } now := time.Now() - s.Process[id] = Process{ + s.data.Process[id] = Process{ CreatedAt: now, UpdatedAt: now, Config: cmd.Config, @@ -336,12 +350,12 @@ func (s *store) removeProcess(cmd CommandRemoveProcess) error { id := cmd.ID.String() - _, ok := s.Process[id] + _, ok := s.data.Process[id] if !ok { return fmt.Errorf("the process with the ID '%s' doesn't exist", id) } - delete(s.Process, id) + delete(s.data.Process, id) return nil } @@ -357,7 +371,7 @@ func (s *store) updateProcess(cmd CommandUpdateProcess) error { return fmt.Errorf("the process with the ID '%s' must have limits defined", dstid) } - p, ok := s.Process[srcid] + p, ok := s.data.Process[srcid] if !ok { return fmt.Errorf("the process with the ID '%s' doesn't exists", srcid) } @@ -367,7 +381,7 @@ func (s *store) updateProcess(cmd CommandUpdateProcess) error { } if srcid == dstid { - s.Process[srcid] = Process{ + s.data.Process[srcid] = Process{ UpdatedAt: time.Now(), Config: cmd.Config, } @@ -375,13 +389,13 @@ func (s *store) updateProcess(cmd CommandUpdateProcess) error { return nil } - _, ok = s.Process[dstid] + _, ok = s.data.Process[dstid] if ok { return fmt.Errorf("the process with the ID '%s' already exists", dstid) } - delete(s.Process, srcid) - s.Process[dstid] = Process{ + delete(s.data.Process, srcid) + s.data.Process[dstid] = Process{ UpdatedAt: time.Now(), Config: cmd.Config, } @@ -395,7 +409,7 @@ func (s *store) setProcessMetadata(cmd CommandSetProcessMetadata) error { id := cmd.ID.String() - p, ok := s.Process[id] + p, ok := s.data.Process[id] if !ok { return fmt.Errorf("the process with the ID '%s' doesn't exists", cmd.ID) } @@ -411,7 +425,7 @@ func (s *store) setProcessMetadata(cmd CommandSetProcessMetadata) error { } p.UpdatedAt = time.Now() - s.Process[id] = p + s.data.Process[id] = p return nil } @@ -420,13 +434,13 @@ func (s *store) addIdentity(cmd CommandAddIdentity) error { s.lock.Lock() defer s.lock.Unlock() - _, ok := s.Users.Users[cmd.Identity.Name] + _, ok := s.data.Users.Users[cmd.Identity.Name] if ok { return fmt.Errorf("the identity with the name '%s' already exists", cmd.Identity.Name) } - s.Users.UpdatedAt = time.Now() - s.Users.Users[cmd.Identity.Name] = cmd.Identity + s.data.Users.UpdatedAt = time.Now() + s.data.Users.Users[cmd.Identity.Name] = cmd.Identity return nil } @@ -435,32 +449,32 @@ func (s *store) updateIdentity(cmd CommandUpdateIdentity) error { s.lock.Lock() defer s.lock.Unlock() - _, ok := s.Users.Users[cmd.Name] + _, ok := s.data.Users.Users[cmd.Name] if !ok { return fmt.Errorf("the identity with the name '%s' doesn't exist", cmd.Name) } if cmd.Name == cmd.Identity.Name { - s.Users.UpdatedAt = time.Now() - s.Users.Users[cmd.Identity.Name] = cmd.Identity + s.data.Users.UpdatedAt = time.Now() + s.data.Users.Users[cmd.Identity.Name] = cmd.Identity return nil } - _, ok = s.Users.Users[cmd.Identity.Name] + _, ok = s.data.Users.Users[cmd.Identity.Name] if ok { return fmt.Errorf("the identity with the name '%s' already exists", cmd.Identity.Name) } now := time.Now() - s.Users.UpdatedAt = now - s.Users.Users[cmd.Identity.Name] = cmd.Identity - s.Policies.UpdatedAt = now - s.Policies.Policies[cmd.Identity.Name] = s.Policies.Policies[cmd.Name] + s.data.Users.UpdatedAt = now + s.data.Users.Users[cmd.Identity.Name] = cmd.Identity + s.data.Policies.UpdatedAt = now + s.data.Policies.Policies[cmd.Identity.Name] = s.data.Policies.Policies[cmd.Name] - delete(s.Users.Users, cmd.Name) - delete(s.Policies.Policies, cmd.Name) + delete(s.data.Users.Users, cmd.Name) + delete(s.data.Policies.Policies, cmd.Name) return nil } @@ -469,10 +483,10 @@ func (s *store) removeIdentity(cmd CommandRemoveIdentity) error { s.lock.Lock() defer s.lock.Unlock() - delete(s.Users.Users, cmd.Name) - s.Users.UpdatedAt = time.Now() - delete(s.Policies.Policies, cmd.Name) - s.Policies.UpdatedAt = time.Now() + delete(s.data.Users.Users, cmd.Name) + s.data.Users.UpdatedAt = time.Now() + delete(s.data.Policies.Policies, cmd.Name) + s.data.Policies.UpdatedAt = time.Now() return nil } @@ -481,13 +495,13 @@ func (s *store) setPolicies(cmd CommandSetPolicies) error { s.lock.Lock() defer s.lock.Unlock() - if _, ok := s.Users.Users[cmd.Name]; !ok { + if _, ok := s.data.Users.Users[cmd.Name]; !ok { return fmt.Errorf("the identity with the name '%s' doesn't exist", cmd.Name) } - delete(s.Policies.Policies, cmd.Name) - s.Policies.Policies[cmd.Name] = cmd.Policies - s.Policies.UpdatedAt = time.Now() + delete(s.data.Policies.Policies, cmd.Name) + s.data.Policies.Policies[cmd.Name] = cmd.Policies + s.data.Policies.UpdatedAt = time.Now() return nil } @@ -496,7 +510,7 @@ func (s *store) setProcessNodeMap(cmd CommandSetProcessNodeMap) error { s.lock.Lock() defer s.lock.Unlock() - s.ProcessNodeMap = cmd.Map + s.data.ProcessNodeMap = cmd.Map return nil } @@ -514,7 +528,7 @@ func (s *store) Snapshot() (raft.FSMSnapshot, error) { s.lock.RLock() defer s.lock.RUnlock() - data, err := json.Marshal(s) + data, err := json.Marshal(&s.data) if err != nil { return nil, err } @@ -532,20 +546,29 @@ func (s *store) Restore(snapshot io.ReadCloser) error { s.lock.Lock() defer s.lock.Unlock() + data := storeData{} + data.init() + dec := json.NewDecoder(snapshot) - if err := dec.Decode(s); err != nil { + if err := dec.Decode(&data); err != nil { return err } - for id, p := range s.Process { + for id, p := range data.Process { if p.Metadata != nil { continue } p.Metadata = map[string]interface{}{} - s.Process[id] = p + data.Process[id] = p } + if data.Version == 0 { + data.Version = 1 + } + + s.data = data + return nil } @@ -555,7 +578,7 @@ func (s *store) ListProcesses() []Process { processes := []Process{} - for _, p := range s.Process { + for _, p := range s.data.Process { processes = append(processes, Process{ CreatedAt: p.CreatedAt, UpdatedAt: p.UpdatedAt, @@ -571,7 +594,7 @@ func (s *store) GetProcess(id app.ProcessID) (Process, error) { s.lock.RLock() defer s.lock.RUnlock() - process, ok := s.Process[id.String()] + process, ok := s.data.Process[id.String()] if !ok { return Process{}, fmt.Errorf("not found") } @@ -589,10 +612,10 @@ func (s *store) ListUsers() Users { defer s.lock.RUnlock() u := Users{ - UpdatedAt: s.Users.UpdatedAt, + UpdatedAt: s.data.Users.UpdatedAt, } - for _, user := range s.Users.Users { + for _, user := range s.data.Users.Users { u.Users = append(u.Users, user) } @@ -604,10 +627,10 @@ func (s *store) GetUser(name string) Users { defer s.lock.RUnlock() u := Users{ - UpdatedAt: s.Users.UpdatedAt, + UpdatedAt: s.data.Users.UpdatedAt, } - if user, ok := s.Users.Users[name]; ok { + if user, ok := s.data.Users.Users[name]; ok { u.Users = append(u.Users, user) } @@ -619,10 +642,10 @@ func (s *store) ListPolicies() Policies { defer s.lock.RUnlock() p := Policies{ - UpdatedAt: s.Policies.UpdatedAt, + UpdatedAt: s.data.Policies.UpdatedAt, } - for _, policies := range s.Policies.Policies { + for _, policies := range s.data.Policies.Policies { p.Policies = append(p.Policies, policies...) } @@ -634,10 +657,10 @@ func (s *store) ListUserPolicies(name string) Policies { defer s.lock.RUnlock() p := Policies{ - UpdatedAt: s.Policies.UpdatedAt, + UpdatedAt: s.data.Policies.UpdatedAt, } - p.Policies = append(p.Policies, s.Policies.Policies[name]...) + p.Policies = append(p.Policies, s.data.Policies.Policies[name]...) return p } @@ -648,7 +671,7 @@ func (s *store) GetProcessNodeMap() map[string]string { m := map[string]string{} - for key, value := range s.ProcessNodeMap { + for key, value := range s.data.ProcessNodeMap { m[key] = value } diff --git a/cluster/store/store_test.go b/cluster/store/store_test.go index a3e38236..c4696e5b 100644 --- a/cluster/store/store_test.go +++ b/cluster/store/store_test.go @@ -29,10 +29,10 @@ func TestCreateStore(t *testing.T) { s, err := createStore() require.NoError(t, err) - require.NotNil(t, s.Process) - require.NotNil(t, s.ProcessNodeMap) - require.NotNil(t, s.Users.Users) - require.NotNil(t, s.Policies.Policies) + require.NotNil(t, s.data.Process) + require.NotNil(t, s.data.ProcessNodeMap) + require.NotNil(t, s.data.Users.Users) + require.NotNil(t, s.data.Policies.Policies) } func TestAddProcessCommand(t *testing.T) { @@ -52,9 +52,9 @@ func TestAddProcessCommand(t *testing.T) { }, }) require.NoError(t, err) - require.NotEmpty(t, s.Process) + require.NotEmpty(t, s.data.Process) - p, ok := s.Process["foobar@"] + p, ok := s.data.Process["foobar@"] require.True(t, ok) require.NotZero(t, p.CreatedAt) @@ -76,7 +76,7 @@ func TestAddProcess(t *testing.T) { Config: config, }) require.Error(t, err) - require.Empty(t, s.Process) + require.Empty(t, s.data.Process) config = &app.Config{ ID: "foobar", @@ -88,7 +88,7 @@ func TestAddProcess(t *testing.T) { Config: config, }) require.NoError(t, err) - require.Equal(t, 1, len(s.Process)) + require.Equal(t, 1, len(s.data.Process)) config = &app.Config{ ID: "foobar", @@ -100,7 +100,7 @@ func TestAddProcess(t *testing.T) { Config: config, }) require.Error(t, err) - require.Equal(t, 1, len(s.Process)) + require.Equal(t, 1, len(s.data.Process)) config = &app.Config{ ID: "foobar", @@ -113,7 +113,7 @@ func TestAddProcess(t *testing.T) { Config: config, }) require.NoError(t, err) - require.Equal(t, 2, len(s.Process)) + require.Equal(t, 2, len(s.data.Process)) } func TestRemoveProcessCommand(t *testing.T) { @@ -133,7 +133,7 @@ func TestRemoveProcessCommand(t *testing.T) { }, }) require.NoError(t, err) - require.NotEmpty(t, s.Process) + require.NotEmpty(t, s.data.Process) err = s.applyCommand(Command{ Operation: OpRemoveProcess, @@ -142,7 +142,7 @@ func TestRemoveProcessCommand(t *testing.T) { }, }) require.NoError(t, err) - require.Empty(t, s.Process) + require.Empty(t, s.data.Process) err = s.applyCommand(Command{ Operation: OpRemoveProcess, @@ -184,25 +184,25 @@ func TestRemoveProcess(t *testing.T) { Config: config1, }) require.NoError(t, err) - require.Equal(t, 1, len(s.Process)) + require.Equal(t, 1, len(s.data.Process)) err = s.addProcess(CommandAddProcess{ Config: config2, }) require.NoError(t, err) - require.Equal(t, 2, len(s.Process)) + require.Equal(t, 2, len(s.data.Process)) err = s.removeProcess(CommandRemoveProcess{ ID: config1.ProcessID(), }) require.NoError(t, err) - require.Equal(t, 1, len(s.Process)) + require.Equal(t, 1, len(s.data.Process)) err = s.removeProcess(CommandRemoveProcess{ ID: config2.ProcessID(), }) require.NoError(t, err) - require.Empty(t, s.Process) + require.Empty(t, s.data.Process) } func TestUpdateProcessCommand(t *testing.T) { @@ -224,7 +224,7 @@ func TestUpdateProcessCommand(t *testing.T) { }, }) require.NoError(t, err) - require.NotEmpty(t, s.Process) + require.NotEmpty(t, s.data.Process) config = &app.Config{ ID: "foobaz", @@ -240,7 +240,7 @@ func TestUpdateProcessCommand(t *testing.T) { }, }) require.NoError(t, err) - require.NotEmpty(t, s.Process) + require.NotEmpty(t, s.data.Process) } func TestUpdateProcess(t *testing.T) { @@ -263,13 +263,13 @@ func TestUpdateProcess(t *testing.T) { Config: config1, }) require.NoError(t, err) - require.Equal(t, 1, len(s.Process)) + require.Equal(t, 1, len(s.data.Process)) err = s.addProcess(CommandAddProcess{ Config: config2, }) require.NoError(t, err) - require.Equal(t, 2, len(s.Process)) + require.Equal(t, 2, len(s.data.Process)) config := &app.Config{ ID: "foobaz", @@ -307,14 +307,14 @@ func TestUpdateProcess(t *testing.T) { Config: config, }) require.NoError(t, err) - require.Equal(t, 2, len(s.Process)) + require.Equal(t, 2, len(s.data.Process)) err = s.updateProcess(CommandUpdateProcess{ ID: config.ProcessID(), Config: config, }) require.NoError(t, err) - require.Equal(t, 2, len(s.Process)) + require.Equal(t, 2, len(s.data.Process)) config3 := &app.Config{ ID: config.ID, @@ -327,7 +327,7 @@ func TestUpdateProcess(t *testing.T) { Config: config3, }) require.NoError(t, err) - require.Equal(t, 2, len(s.Process)) + require.Equal(t, 2, len(s.data.Process)) _, err = s.GetProcess(config1.ProcessID()) require.Error(t, err) @@ -356,7 +356,7 @@ func TestSetProcessMetadataCommand(t *testing.T) { }, }) require.NoError(t, err) - require.NotEmpty(t, s.Process) + require.NotEmpty(t, s.data.Process) p, err := s.GetProcess(config.ProcessID()) require.NoError(t, err) @@ -403,7 +403,7 @@ func TestSetProcessMetadata(t *testing.T) { Config: config, }) require.NoError(t, err) - require.Equal(t, 1, len(s.Process)) + require.Equal(t, 1, len(s.data.Process)) err = s.setProcessMetadata(CommandSetProcessMetadata{ ID: config.ProcessID(), @@ -471,7 +471,7 @@ func TestAddIdentityCommand(t *testing.T) { }, }) require.NoError(t, err) - require.Equal(t, 1, len(s.Users.Users)) + require.Equal(t, 1, len(s.data.Users.Users)) } func TestAddIdentity(t *testing.T) { @@ -486,15 +486,15 @@ func TestAddIdentity(t *testing.T) { Identity: identity, }) require.NoError(t, err) - require.Equal(t, 1, len(s.Users.Users)) - require.Equal(t, 0, len(s.Policies.Policies)) + require.Equal(t, 1, len(s.data.Users.Users)) + require.Equal(t, 0, len(s.data.Policies.Policies)) err = s.addIdentity(CommandAddIdentity{ Identity: identity, }) require.Error(t, err) - require.Equal(t, 1, len(s.Users.Users)) - require.Equal(t, 0, len(s.Policies.Policies)) + require.Equal(t, 1, len(s.data.Users.Users)) + require.Equal(t, 0, len(s.data.Policies.Policies)) } func TestRemoveIdentityCommand(t *testing.T) { @@ -512,7 +512,7 @@ func TestRemoveIdentityCommand(t *testing.T) { }, }) require.NoError(t, err) - require.Equal(t, 1, len(s.Users.Users)) + require.Equal(t, 1, len(s.data.Users.Users)) err = s.applyCommand(Command{ Operation: OpRemoveIdentity, @@ -521,7 +521,7 @@ func TestRemoveIdentityCommand(t *testing.T) { }, }) require.NoError(t, err) - require.Equal(t, 0, len(s.Users.Users)) + require.Equal(t, 0, len(s.data.Users.Users)) } func TestRemoveIdentity(t *testing.T) { @@ -536,22 +536,22 @@ func TestRemoveIdentity(t *testing.T) { Identity: identity, }) require.NoError(t, err) - require.Equal(t, 1, len(s.Users.Users)) - require.Equal(t, 0, len(s.Policies.Policies)) + require.Equal(t, 1, len(s.data.Users.Users)) + require.Equal(t, 0, len(s.data.Policies.Policies)) err = s.removeIdentity(CommandRemoveIdentity{ Name: "foobar", }) require.NoError(t, err) - require.Equal(t, 0, len(s.Users.Users)) - require.Equal(t, 0, len(s.Policies.Policies)) + require.Equal(t, 0, len(s.data.Users.Users)) + require.Equal(t, 0, len(s.data.Policies.Policies)) err = s.removeIdentity(CommandRemoveIdentity{ Name: "foobar", }) require.NoError(t, err) - require.Equal(t, 0, len(s.Users.Users)) - require.Equal(t, 0, len(s.Policies.Policies)) + require.Equal(t, 0, len(s.data.Users.Users)) + require.Equal(t, 0, len(s.data.Policies.Policies)) } func TestSetPoliciesCommand(t *testing.T) { @@ -569,8 +569,8 @@ func TestSetPoliciesCommand(t *testing.T) { }, }) require.NoError(t, err) - require.Equal(t, 1, len(s.Users.Users)) - require.Equal(t, 0, len(s.Policies.Policies)) + require.Equal(t, 1, len(s.data.Users.Users)) + require.Equal(t, 0, len(s.data.Policies.Policies)) err = s.applyCommand(Command{ Operation: OpSetPolicies, @@ -593,8 +593,8 @@ func TestSetPoliciesCommand(t *testing.T) { }, }) require.NoError(t, err) - require.Equal(t, 1, len(s.Users.Users)) - require.Equal(t, 2, len(s.Policies.Policies["foobar"])) + require.Equal(t, 1, len(s.data.Users.Users)) + require.Equal(t, 2, len(s.data.Policies.Policies["foobar"])) } func TestSetPolicies(t *testing.T) { @@ -625,22 +625,22 @@ func TestSetPolicies(t *testing.T) { Policies: policies, }) require.Error(t, err) - require.Equal(t, 0, len(s.Policies.Policies["foobar"])) + require.Equal(t, 0, len(s.data.Policies.Policies["foobar"])) err = s.addIdentity(CommandAddIdentity{ Identity: identity, }) require.NoError(t, err) - require.Equal(t, 1, len(s.Users.Users)) - require.Equal(t, 0, len(s.Policies.Policies)) + require.Equal(t, 1, len(s.data.Users.Users)) + require.Equal(t, 0, len(s.data.Policies.Policies)) err = s.setPolicies(CommandSetPolicies{ Name: "foobar", Policies: policies, }) require.NoError(t, err) - require.Equal(t, 1, len(s.Users.Users)) - require.Equal(t, 2, len(s.Policies.Policies["foobar"])) + require.Equal(t, 1, len(s.data.Users.Users)) + require.Equal(t, 2, len(s.data.Policies.Policies["foobar"])) } func TestUpdateUserCommand(t *testing.T) { @@ -662,7 +662,7 @@ func TestUpdateUserCommand(t *testing.T) { }, }) require.NoError(t, err) - require.Equal(t, 1, len(s.Users.Users)) + require.Equal(t, 1, len(s.data.Users.Users)) err = s.applyCommand(Command{ Operation: OpAddIdentity, @@ -671,7 +671,7 @@ func TestUpdateUserCommand(t *testing.T) { }, }) require.NoError(t, err) - require.Equal(t, 2, len(s.Users.Users)) + require.Equal(t, 2, len(s.data.Users.Users)) err = s.applyCommand(Command{ Operation: OpUpdateIdentity, @@ -683,7 +683,7 @@ func TestUpdateUserCommand(t *testing.T) { }, }) require.NoError(t, err) - require.Equal(t, 2, len(s.Users.Users)) + require.Equal(t, 2, len(s.data.Users.Users)) } func TestUpdateIdentity(t *testing.T) { @@ -702,13 +702,13 @@ func TestUpdateIdentity(t *testing.T) { Identity: idty1, }) require.NoError(t, err) - require.Equal(t, 1, len(s.Users.Users)) + require.Equal(t, 1, len(s.data.Users.Users)) err = s.addIdentity(CommandAddIdentity{ Identity: idty2, }) require.NoError(t, err) - require.Equal(t, 2, len(s.Users.Users)) + require.Equal(t, 2, len(s.data.Users.Users)) idty := identity.User{ Name: "foobaz", @@ -719,7 +719,7 @@ func TestUpdateIdentity(t *testing.T) { Identity: idty, }) require.Error(t, err) - require.Equal(t, 2, len(s.Users.Users)) + require.Equal(t, 2, len(s.data.Users.Users)) idty.Name = "foobar2" @@ -728,7 +728,7 @@ func TestUpdateIdentity(t *testing.T) { Identity: idty, }) require.Error(t, err) - require.Equal(t, 2, len(s.Users.Users)) + require.Equal(t, 2, len(s.data.Users.Users)) idty.Name = "foobaz" @@ -737,7 +737,7 @@ func TestUpdateIdentity(t *testing.T) { Identity: idty, }) require.NoError(t, err) - require.Equal(t, 2, len(s.Users.Users)) + require.Equal(t, 2, len(s.data.Users.Users)) u := s.GetUser("foobar1") require.Empty(t, u.Users) @@ -776,22 +776,22 @@ func TestUpdateIdentityWithPolicies(t *testing.T) { Identity: idty1, }) require.NoError(t, err) - require.Equal(t, 1, len(s.Users.Users)) + require.Equal(t, 1, len(s.data.Users.Users)) err = s.setPolicies(CommandSetPolicies{ Name: "foobar", Policies: policies, }) require.NoError(t, err) - require.Equal(t, 2, len(s.Policies.Policies["foobar"])) + require.Equal(t, 2, len(s.data.Policies.Policies["foobar"])) err = s.updateIdentity(CommandUpdateIdentity{ Name: "foobar", Identity: idty1, }) require.NoError(t, err) - require.Equal(t, 1, len(s.Users.Users)) - require.Equal(t, 2, len(s.Policies.Policies["foobar"])) + require.Equal(t, 1, len(s.data.Users.Users)) + require.Equal(t, 2, len(s.data.Policies.Policies["foobar"])) idty2 := identity.User{ Name: "foobaz", @@ -802,9 +802,9 @@ func TestUpdateIdentityWithPolicies(t *testing.T) { Identity: idty2, }) require.NoError(t, err) - require.Equal(t, 1, len(s.Users.Users)) - require.Equal(t, 0, len(s.Policies.Policies["foobar"])) - require.Equal(t, 2, len(s.Policies.Policies["foobaz"])) + require.Equal(t, 1, len(s.data.Users.Users)) + require.Equal(t, 0, len(s.data.Policies.Policies["foobar"])) + require.Equal(t, 2, len(s.data.Policies.Policies["foobaz"])) } func TestSetProcessNodeMapCommand(t *testing.T) { @@ -822,7 +822,7 @@ func TestSetProcessNodeMapCommand(t *testing.T) { }, }) require.NoError(t, err) - require.Equal(t, m1, s.ProcessNodeMap) + require.Equal(t, m1, s.data.ProcessNodeMap) } func TestSetProcessNodeMap(t *testing.T) { @@ -837,7 +837,7 @@ func TestSetProcessNodeMap(t *testing.T) { Map: m1, }) require.NoError(t, err) - require.Equal(t, m1, s.ProcessNodeMap) + require.Equal(t, m1, s.data.ProcessNodeMap) m2 := map[string]string{ "key": "value2", @@ -847,7 +847,7 @@ func TestSetProcessNodeMap(t *testing.T) { Map: m2, }) require.NoError(t, err) - require.Equal(t, m2, s.ProcessNodeMap) + require.Equal(t, m2, s.data.ProcessNodeMap) m := s.GetProcessNodeMap() require.Equal(t, m2, m) diff --git a/cluster/version.go b/cluster/version.go new file mode 100644 index 00000000..5d0de6c5 --- /dev/null +++ b/cluster/version.go @@ -0,0 +1,43 @@ +package cluster + +import ( + "fmt" + + "github.com/Masterminds/semver/v3" +) + +type ClusterVersion struct { + Major uint64 + Minor uint64 + Patch uint64 +} + +func (v ClusterVersion) String() string { + return fmt.Sprintf("%d.%d.%d", v.Major, v.Minor, v.Patch) +} + +func (v ClusterVersion) Equal(x ClusterVersion) bool { + return v.String() == x.String() +} + +func ParseClusterVersion(version string) (ClusterVersion, error) { + v := ClusterVersion{} + + sv, err := semver.NewVersion(version) + if err != nil { + return v, err + } + + v.Major = sv.Major() + v.Minor = sv.Minor() + v.Patch = sv.Patch() + + return v, nil +} + +// Version of the cluster +var Version = ClusterVersion{ + Major: 1, + Minor: 0, + Patch: 0, +} diff --git a/docs/docs.go b/docs/docs.go index 5ad679bb..8367ebce 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -3782,17 +3782,23 @@ const docTemplate = `{ "core_api_address": { "type": "string" }, + "degraded": { + "type": "boolean" + }, "id": { "type": "string" }, - "server": { + "nodes": { "type": "array", "items": { - "$ref": "#/definitions/api.ClusterServer" + "$ref": "#/definitions/api.ClusterNode" } }, - "stats": { - "$ref": "#/definitions/api.ClusterStats" + "raft": { + "$ref": "#/definitions/api.ClusterRaft" + }, + "version": { + "type": "string" } } }, @@ -3827,6 +3833,9 @@ const docTemplate = `{ }, "uptime_seconds": { "type": "integer" + }, + "version": { + "type": "string" } } }, @@ -3863,9 +3872,11 @@ const docTemplate = `{ "type": "boolean" }, "memory_limit_bytes": { + "description": "bytes", "type": "integer" }, "memory_used_bytes": { + "description": "bytes", "type": "integer" }, "ncpu": { @@ -3877,6 +3888,7 @@ const docTemplate = `{ "type": "object", "properties": { "cpu": { + "description": "percent 0-100*ncpu", "type": "number" }, "domain": { @@ -3886,6 +3898,7 @@ const docTemplate = `{ "type": "string" }, "memory_bytes": { + "description": "bytes", "type": "integer" }, "node_id": { @@ -3901,6 +3914,7 @@ const docTemplate = `{ "type": "string" }, "runtime_seconds": { + "description": "seconds", "type": "integer" }, "state": { @@ -3908,7 +3922,21 @@ const docTemplate = `{ } } }, - "api.ClusterServer": { + "api.ClusterRaft": { + "type": "object", + "properties": { + "server": { + "type": "array", + "items": { + "$ref": "#/definitions/api.ClusterRaftServer" + } + }, + "stats": { + "$ref": "#/definitions/api.ClusterRaftStats" + } + } + }, + "api.ClusterRaftServer": { "type": "object", "properties": { "address": { @@ -3926,7 +3954,7 @@ const docTemplate = `{ } } }, - "api.ClusterStats": { + "api.ClusterRaftStats": { "type": "object", "properties": { "last_contact_ms": { @@ -4053,6 +4081,7 @@ const docTemplate = `{ "type": "object", "properties": { "address": { + "description": "ip:port", "type": "string" }, "bootstrap": { @@ -4061,9 +4090,19 @@ const docTemplate = `{ "debug": { "type": "boolean" }, + "emergency_leader_timeout": { + "description": "seconds", + "type": "integer", + "format": "int64" + }, "enable": { "type": "boolean" }, + "node_recover_timeout": { + "description": "seconds", + "type": "integer", + "format": "int64" + }, "peers": { "type": "array", "items": { @@ -4072,6 +4111,11 @@ const docTemplate = `{ }, "recover": { "type": "boolean" + }, + "sync_interval": { + "description": "seconds", + "type": "integer", + "format": "int64" } } }, @@ -4261,9 +4305,11 @@ const docTemplate = `{ "type": "object", "properties": { "max_cpu_usage": { + "description": "percent 0-100", "type": "number" }, "max_memory_usage": { + "description": "percent 0-100", "type": "number" } } @@ -6225,6 +6271,7 @@ const docTemplate = `{ "type": "object", "properties": { "address": { + "description": "ip:port", "type": "string" }, "bootstrap": { @@ -6233,9 +6280,19 @@ const docTemplate = `{ "debug": { "type": "boolean" }, + "emergency_leader_timeout": { + "description": "seconds", + "type": "integer", + "format": "int64" + }, "enable": { "type": "boolean" }, + "node_recover_timeout": { + "description": "seconds", + "type": "integer", + "format": "int64" + }, "peers": { "type": "array", "items": { @@ -6244,6 +6301,11 @@ const docTemplate = `{ }, "recover": { "type": "boolean" + }, + "sync_interval": { + "description": "seconds", + "type": "integer", + "format": "int64" } } }, @@ -6433,9 +6495,11 @@ const docTemplate = `{ "type": "object", "properties": { "max_cpu_usage": { + "description": "percent 0-100", "type": "number" }, "max_memory_usage": { + "description": "percent 0-100", "type": "number" } } diff --git a/docs/swagger.json b/docs/swagger.json index ef67e421..1e89694e 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -3774,17 +3774,23 @@ "core_api_address": { "type": "string" }, + "degraded": { + "type": "boolean" + }, "id": { "type": "string" }, - "server": { + "nodes": { "type": "array", "items": { - "$ref": "#/definitions/api.ClusterServer" + "$ref": "#/definitions/api.ClusterNode" } }, - "stats": { - "$ref": "#/definitions/api.ClusterStats" + "raft": { + "$ref": "#/definitions/api.ClusterRaft" + }, + "version": { + "type": "string" } } }, @@ -3819,6 +3825,9 @@ }, "uptime_seconds": { "type": "integer" + }, + "version": { + "type": "string" } } }, @@ -3855,9 +3864,11 @@ "type": "boolean" }, "memory_limit_bytes": { + "description": "bytes", "type": "integer" }, "memory_used_bytes": { + "description": "bytes", "type": "integer" }, "ncpu": { @@ -3869,6 +3880,7 @@ "type": "object", "properties": { "cpu": { + "description": "percent 0-100*ncpu", "type": "number" }, "domain": { @@ -3878,6 +3890,7 @@ "type": "string" }, "memory_bytes": { + "description": "bytes", "type": "integer" }, "node_id": { @@ -3893,6 +3906,7 @@ "type": "string" }, "runtime_seconds": { + "description": "seconds", "type": "integer" }, "state": { @@ -3900,7 +3914,21 @@ } } }, - "api.ClusterServer": { + "api.ClusterRaft": { + "type": "object", + "properties": { + "server": { + "type": "array", + "items": { + "$ref": "#/definitions/api.ClusterRaftServer" + } + }, + "stats": { + "$ref": "#/definitions/api.ClusterRaftStats" + } + } + }, + "api.ClusterRaftServer": { "type": "object", "properties": { "address": { @@ -3918,7 +3946,7 @@ } } }, - "api.ClusterStats": { + "api.ClusterRaftStats": { "type": "object", "properties": { "last_contact_ms": { @@ -4045,6 +4073,7 @@ "type": "object", "properties": { "address": { + "description": "ip:port", "type": "string" }, "bootstrap": { @@ -4053,9 +4082,19 @@ "debug": { "type": "boolean" }, + "emergency_leader_timeout": { + "description": "seconds", + "type": "integer", + "format": "int64" + }, "enable": { "type": "boolean" }, + "node_recover_timeout": { + "description": "seconds", + "type": "integer", + "format": "int64" + }, "peers": { "type": "array", "items": { @@ -4064,6 +4103,11 @@ }, "recover": { "type": "boolean" + }, + "sync_interval": { + "description": "seconds", + "type": "integer", + "format": "int64" } } }, @@ -4253,9 +4297,11 @@ "type": "object", "properties": { "max_cpu_usage": { + "description": "percent 0-100", "type": "number" }, "max_memory_usage": { + "description": "percent 0-100", "type": "number" } } @@ -6217,6 +6263,7 @@ "type": "object", "properties": { "address": { + "description": "ip:port", "type": "string" }, "bootstrap": { @@ -6225,9 +6272,19 @@ "debug": { "type": "boolean" }, + "emergency_leader_timeout": { + "description": "seconds", + "type": "integer", + "format": "int64" + }, "enable": { "type": "boolean" }, + "node_recover_timeout": { + "description": "seconds", + "type": "integer", + "format": "int64" + }, "peers": { "type": "array", "items": { @@ -6236,6 +6293,11 @@ }, "recover": { "type": "boolean" + }, + "sync_interval": { + "description": "seconds", + "type": "integer", + "format": "int64" } } }, @@ -6425,9 +6487,11 @@ "type": "object", "properties": { "max_cpu_usage": { + "description": "percent 0-100", "type": "number" }, "max_memory_usage": { + "description": "percent 0-100", "type": "number" } } diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 03db6bb6..2a664f8d 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -73,14 +73,18 @@ definitions: type: string core_api_address: type: string + degraded: + type: boolean id: type: string - server: + nodes: items: - $ref: '#/definitions/api.ClusterServer' + $ref: '#/definitions/api.ClusterNode' type: array - stats: - $ref: '#/definitions/api.ClusterStats' + raft: + $ref: '#/definitions/api.ClusterRaft' + version: + type: string type: object api.ClusterNode: properties: @@ -104,6 +108,8 @@ definitions: type: string uptime_seconds: type: integer + version: + type: string type: object api.ClusterNodeFiles: properties: @@ -128,8 +134,10 @@ definitions: is_throttling: type: boolean memory_limit_bytes: + description: bytes type: integer memory_used_bytes: + description: bytes type: integer ncpu: type: number @@ -137,12 +145,14 @@ definitions: api.ClusterProcess: properties: cpu: + description: percent 0-100*ncpu type: number domain: type: string id: type: string memory_bytes: + description: bytes type: integer node_id: type: string @@ -153,11 +163,21 @@ definitions: reference: type: string runtime_seconds: + description: seconds type: integer state: type: string type: object - api.ClusterServer: + api.ClusterRaft: + properties: + server: + items: + $ref: '#/definitions/api.ClusterRaftServer' + type: array + stats: + $ref: '#/definitions/api.ClusterRaftStats' + type: object + api.ClusterRaftServer: properties: address: description: raft address @@ -169,7 +189,7 @@ definitions: voter: type: boolean type: object - api.ClusterStats: + api.ClusterRaftStats: properties: last_contact_ms: type: number @@ -252,19 +272,32 @@ definitions: cluster: properties: address: + description: ip:port type: string bootstrap: type: boolean debug: type: boolean + emergency_leader_timeout: + description: seconds + format: int64 + type: integer enable: type: boolean + node_recover_timeout: + description: seconds + format: int64 + type: integer peers: items: type: string type: array recover: type: boolean + sync_interval: + description: seconds + format: int64 + type: integer type: object created_at: description: When this config has been persisted @@ -393,8 +426,10 @@ definitions: resources: properties: max_cpu_usage: + description: percent 0-100 type: number max_memory_usage: + description: percent 0-100 type: number type: object router: @@ -1783,19 +1818,32 @@ definitions: cluster: properties: address: + description: ip:port type: string bootstrap: type: boolean debug: type: boolean + emergency_leader_timeout: + description: seconds + format: int64 + type: integer enable: type: boolean + node_recover_timeout: + description: seconds + format: int64 + type: integer peers: items: type: string type: array recover: type: boolean + sync_interval: + description: seconds + format: int64 + type: integer type: object created_at: description: When this config has been persisted @@ -1924,8 +1972,10 @@ definitions: resources: properties: max_cpu_usage: + description: percent 0-100 type: number max_memory_usage: + description: percent 0-100 type: number type: object router: diff --git a/http/api/cluster.go b/http/api/cluster.go index f81c9e23..56253c23 100644 --- a/http/api/cluster.go +++ b/http/api/cluster.go @@ -1,6 +1,11 @@ package api -import "encoding/json" +import ( + "encoding/json" + "time" + + "github.com/datarhei/core/v16/cluster/proxy" +) type ClusterNode struct { ID string `json:"id"` @@ -12,15 +17,28 @@ type ClusterNode struct { Latency float64 `json:"latency_ms"` // milliseconds State string `json:"state"` Resources ClusterNodeResources `json:"resources"` + Version string `json:"version"` +} + +func (n *ClusterNode) Marshal(about proxy.NodeAbout) { + n.ID = about.ID + n.Name = about.Name + n.Address = about.Address + n.CreatedAt = about.CreatedAt.Format(time.RFC3339) + n.Uptime = int64(about.Uptime.Seconds()) + n.LastContact = about.LastContact.Unix() + n.Latency = about.Latency.Seconds() * 1000 + n.State = about.State + n.Resources = ClusterNodeResources(about.Resources) } type ClusterNodeResources struct { IsThrottling bool `json:"is_throttling"` NCPU float64 `json:"ncpu"` - CPU float64 `json:"cpu_used"` // percent 0-100*npcu - CPULimit float64 `json:"cpu_limit"` // percent 0-100*npcu - Mem uint64 `json:"memory_used_bytes"` - MemLimit uint64 `json:"memory_limit_bytes"` + CPU float64 `json:"cpu_used"` // percent 0-100*npcu + CPULimit float64 `json:"cpu_limit"` // percent 0-100*npcu + Mem uint64 `json:"memory_used_bytes"` // bytes + MemLimit uint64 `json:"memory_limit_bytes"` // bytes } type ClusterNodeFiles struct { @@ -28,13 +46,35 @@ type ClusterNodeFiles struct { Files map[string][]string `json:"files"` } -type ClusterServer struct { +type ClusterRaftServer struct { ID string `json:"id"` Address string `json:"address"` // raft address Voter bool `json:"voter"` Leader bool `json:"leader"` } +type ClusterRaftStats struct { + State string `json:"state"` + LastContact float64 `json:"last_contact_ms"` + NumPeers uint64 `json:"num_peers"` +} + +type ClusterRaft struct { + Server []ClusterRaftServer `json:"server"` + Stats ClusterRaftStats `json:"stats"` +} + +type ClusterAbout struct { + ID string `json:"id"` + Address string `json:"address"` + ClusterAPIAddress string `json:"cluster_api_address"` + CoreAPIAddress string `json:"core_api_address"` + Raft ClusterRaft `json:"raft"` + Nodes []ClusterNode `json:"nodes"` + Version string `json:"version"` + Degraded bool `json:"degraded"` +} + type ClusterProcess struct { ID string `json:"id"` Owner string `json:"owner"` @@ -43,22 +83,7 @@ type ClusterProcess struct { Reference string `json:"reference"` Order string `json:"order"` State string `json:"state"` - CPU json.Number `json:"cpu" swaggertype:"number" jsonschema:"type=number"` - Memory uint64 `json:"memory_bytes"` - Runtime int64 `json:"runtime_seconds"` -} - -type ClusterStats struct { - State string `json:"state"` - LastContact float64 `json:"last_contact_ms"` - NumPeers uint64 `json:"num_peers"` -} - -type ClusterAbout struct { - ID string `json:"id"` - Address string `json:"address"` - ClusterAPIAddress string `json:"cluster_api_address"` - CoreAPIAddress string `json:"core_api_address"` - Server []ClusterServer `json:"server"` - Stats ClusterStats `json:"stats"` + CPU json.Number `json:"cpu" swaggertype:"number" jsonschema:"type=number"` // percent 0-100*ncpu + Memory uint64 `json:"memory_bytes"` // bytes + Runtime int64 `json:"runtime_seconds"` // seconds } diff --git a/http/handler/api/cluster.go b/http/handler/api/cluster.go index 4fd2ca3b..62d793b3 100644 --- a/http/handler/api/cluster.go +++ b/http/handler/api/cluster.go @@ -65,16 +65,20 @@ func (h *ClusterHandler) About(c echo.Context) error { Address: state.Address, ClusterAPIAddress: state.ClusterAPIAddress, CoreAPIAddress: state.CoreAPIAddress, - Server: []api.ClusterServer{}, - Stats: api.ClusterStats{ - State: state.Stats.State, - LastContact: state.Stats.LastContact.Seconds() * 1000, - NumPeers: state.Stats.NumPeers, + Raft: api.ClusterRaft{ + Server: []api.ClusterRaftServer{}, + Stats: api.ClusterRaftStats{ + State: state.Raft.Stats.State, + LastContact: state.Raft.Stats.LastContact.Seconds() * 1000, + NumPeers: state.Raft.Stats.NumPeers, + }, }, + Version: state.Version.String(), + Degraded: state.Degraded, } - for _, n := range state.Nodes { - about.Server = append(about.Server, api.ClusterServer{ + for _, n := range state.Raft.Server { + about.Raft.Server = append(about.Raft.Server, api.ClusterRaftServer{ ID: n.ID, Address: n.Address, Voter: n.Voter, @@ -82,6 +86,13 @@ func (h *ClusterHandler) About(c echo.Context) error { }) } + for _, node := range state.Nodes { + n := api.ClusterNode{} + n.Marshal(node) + + about.Nodes = append(about.Nodes, n) + } + return c.JSON(http.StatusOK, about) } @@ -159,17 +170,8 @@ func (h *ClusterHandler) GetNodes(c echo.Context) error { for _, node := range nodes { about := node.About() - n := api.ClusterNode{ - ID: about.ID, - Name: about.Name, - Address: about.Address, - CreatedAt: about.CreatedAt.Format(time.RFC3339), - Uptime: int64(about.Uptime.Seconds()), - LastContact: about.LastContact.Unix(), - Latency: about.Latency.Seconds() * 1000, - State: about.State, - Resources: api.ClusterNodeResources(about.Resources), - } + n := api.ClusterNode{} + n.Marshal(about) list = append(list, n) }