Add package cluster :

- based on dragonboat lib (https://github.com/lni/dragonboat)
- add extended config with validate
- add backend managment to allow simple implementation for backend simple, cocurrent & disk
- has no backend implemented and must be a lib for a cluster mode of one backend
This commit is contained in:
Nicolas JUHEL
2021-04-12 18:02:22 +02:00
parent 3b7ac900b5
commit c083ca1d08
13 changed files with 1966 additions and 0 deletions

82
cluster/async.go Normal file
View File

@@ -0,0 +1,82 @@
/***********************************************************************************************************************
*
* MIT License
*
* Copyright (c) 2021 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
**********************************************************************************************************************/
package cluster
import (
dgbclt "github.com/lni/dragonboat/v3"
dgbcli "github.com/lni/dragonboat/v3/client"
liberr "github.com/nabbar/golib/errors"
)
func (c *cRaft) AsyncPropose(session *dgbcli.Session, cmd []byte) (*dgbclt.RequestState, liberr.Error) {
r, e := c.nodeHost.Propose(session, cmd, c.timeoutCmdASync)
if e != nil {
return r, ErrorCommandASync.ErrorParent(c.getErrorCommand("Propose"), e)
}
return r, nil
}
func (c *cRaft) AsyncProposeSession(session *dgbcli.Session) (*dgbclt.RequestState, liberr.Error) {
r, e := c.nodeHost.ProposeSession(session, c.timeoutCmdASync)
if e != nil {
return r, ErrorCommandASync.ErrorParent(c.getErrorCommand("ProposeSession"), e)
}
return r, nil
}
func (c *cRaft) AsyncReadIndex() (*dgbclt.RequestState, liberr.Error) {
r, e := c.nodeHost.ReadIndex(c.config.ClusterID, c.timeoutCmdASync)
if e != nil {
return r, ErrorCommandASync.ErrorParent(c.getErrorCluster(), c.getErrorCommand("ReadIndex"), e)
}
return r, nil
}
func (c *cRaft) AsyncRequestCompaction(nodeID uint64) (*dgbclt.SysOpState, liberr.Error) {
var er error
if nodeID == 0 {
nodeID = c.config.NodeID
er = c.getErrorNode()
} else {
er = c.getErrorNodeTarget(nodeID)
}
r, e := c.nodeHost.RequestCompaction(c.config.ClusterID, nodeID)
if e != nil {
return r, ErrorCommandASync.ErrorParent(c.getErrorCluster(), er, c.getErrorCommand("RequestCompaction"), e)
}
return r, nil
}

290
cluster/cluster.go Normal file
View File

@@ -0,0 +1,290 @@
/***********************************************************************************************************************
*
* MIT License
*
* Copyright (c) 2021 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
**********************************************************************************************************************/
package cluster
import (
"fmt"
"time"
dgbclt "github.com/lni/dragonboat/v3"
dgbcli "github.com/lni/dragonboat/v3/client"
dgbcfg "github.com/lni/dragonboat/v3/config"
dgbstm "github.com/lni/dragonboat/v3/statemachine"
liberr "github.com/nabbar/golib/errors"
)
type cRaft struct {
memberInit map[uint64]dgbclt.Target
fctCreate interface{}
config dgbcfg.Config
nodeHost *dgbclt.NodeHost
timeoutCmdSync time.Duration
timeoutCmdASync time.Duration
}
func (c *cRaft) getErrorCluster() error {
//nolint #goerr113
return fmt.Errorf("cluster: %v", c.config.ClusterID)
}
func (c *cRaft) getErrorNode() error {
//nolint #goerr113
return fmt.Errorf("node: %v", c.config.NodeID)
}
func (c *cRaft) getErrorNodeTarget(target uint64) error {
//nolint #goerr113
return fmt.Errorf("target node: %v", target)
}
func (c *cRaft) getErrorCommand(cmd string) error {
//nolint #goerr113
return fmt.Errorf("command: %v", cmd)
}
func (c *cRaft) GetConfig() dgbcfg.Config {
return c.config
}
func (c *cRaft) SetConfig(cfg dgbcfg.Config) {
c.config = cfg
}
func (c *cRaft) GetFctCreate() dgbstm.CreateStateMachineFunc {
if f, ok := c.fctCreate.(dgbstm.CreateStateMachineFunc); ok {
return f
}
return nil
}
func (c *cRaft) GetFctCreateConcurrent() dgbstm.CreateConcurrentStateMachineFunc {
if f, ok := c.fctCreate.(dgbstm.CreateConcurrentStateMachineFunc); ok {
return f
}
return nil
}
func (c *cRaft) GetFctCreateOnDisk() dgbstm.CreateOnDiskStateMachineFunc {
if f, ok := c.fctCreate.(dgbstm.CreateOnDiskStateMachineFunc); ok {
return f
}
return nil
}
func (c *cRaft) SetFctCreate(fctCreate interface{}) {
c.fctCreate = fctCreate
}
func (c *cRaft) SetFctCreateSTM(fctCreate dgbstm.CreateStateMachineFunc) {
c.fctCreate = fctCreate
}
func (c *cRaft) SetFctCreateSTMConcurrent(fctCreate dgbstm.CreateConcurrentStateMachineFunc) {
c.fctCreate = fctCreate
}
func (c *cRaft) SetFctCreateSTMOnDisk(fctCreate dgbstm.CreateOnDiskStateMachineFunc) {
c.fctCreate = fctCreate
}
func (c *cRaft) GetMemberInit() map[uint64]dgbclt.Target {
return c.memberInit
}
func (c *cRaft) SetMemberInit(memberList map[uint64]dgbclt.Target) {
c.memberInit = memberList
}
func (c *cRaft) SetTimeoutCommandSync(timeout time.Duration) {
c.timeoutCmdSync = timeout
}
func (c *cRaft) SetTimeoutCommandASync(timeout time.Duration) {
c.timeoutCmdASync = timeout
}
func (c *cRaft) GetNodeHostConfig() dgbcfg.NodeHostConfig {
return c.nodeHost.NodeHostConfig()
}
func (c *cRaft) RaftAddress() string {
return c.nodeHost.RaftAddress()
}
func (c *cRaft) ID() string {
return c.nodeHost.ID()
}
func (c *cRaft) ClusterStart(join bool) liberr.Error {
err := ErrorNodeHostStart.Error(nil)
if join {
err = ErrorNodeHostJoin.Error(nil)
}
if f, ok := c.fctCreate.(dgbstm.CreateStateMachineFunc); ok {
err.AddParent(c.nodeHost.StartCluster(c.memberInit, join, f, c.config))
} else if f, ok := c.fctCreate.(dgbstm.CreateConcurrentStateMachineFunc); ok {
err.AddParent(c.nodeHost.StartConcurrentCluster(c.memberInit, join, f, c.config))
} else if f, ok := c.fctCreate.(dgbstm.CreateOnDiskStateMachineFunc); ok {
err.AddParent(c.nodeHost.StartOnDiskCluster(c.memberInit, join, f, c.config))
} else {
//nolint #goerr113
return ErrorParamsMismatching.ErrorParent(fmt.Errorf("create function is not one of type of CreateStateMachineFunc, CreateConcurrentStateMachineFunc, CreateOnDiskStateMachineFunc"))
}
if err.HasParent() {
return err
}
return nil
}
func (c *cRaft) ClusterStop(force bool) liberr.Error {
e := c.nodeHost.StopCluster(c.config.ClusterID)
if e != nil && !force {
return ErrorNodeHostStop.ErrorParent(c.getErrorCluster(), e)
}
c.nodeHost.Stop()
return nil
}
func (c *cRaft) ClusterRestart(force bool) liberr.Error {
if err := c.ClusterStop(force); err != nil {
return ErrorNodeHostRestart.Error(err)
}
return c.ClusterStart(false)
}
func (c *cRaft) NodeStop(target uint64) liberr.Error {
var en error
if target == 0 {
target = c.config.NodeID
en = c.getErrorNode()
} else {
en = c.getErrorNodeTarget(target)
}
e := c.nodeHost.StopNode(c.config.ClusterID, target)
if e != nil {
return ErrorNodeHostStop.ErrorParent(c.getErrorCluster(), en, e)
}
return nil
}
func (c *cRaft) NodeRestart(force bool) liberr.Error {
var join = false
if l, ok, err := c.GetLeaderID(); err == nil && ok && l == c.config.NodeID {
join = true
var sErr = ErrorNodeHostRestart.ErrorParent(c.getErrorCluster(), c.getErrorNode())
for id, nd := range c.memberInit {
if id == c.config.NodeID {
continue
}
if nd == c.RaftAddress() {
continue
}
if err = c.RequestLeaderTransfer(id); err == nil {
sErr.AddParentError(err)
break
}
}
if l, ok, err = c.GetLeaderID(); err == nil && ok && l == c.config.NodeID && !force {
return sErr
}
} else if err == nil && ok {
join = true
}
if err := c.NodeStop(0); err != nil && !force {
return ErrorNodeHostRestart.Error(err)
} else if err != nil && force {
join = false
_ = c.ClusterStop(true)
}
return c.ClusterStart(join)
}
func (c *cRaft) GetLeaderID() (leader uint64, valid bool, err liberr.Error) {
var e error
leader, valid, e = c.nodeHost.GetLeaderID(c.config.ClusterID)
if e != nil {
err = ErrorLeader.ErrorParent(c.getErrorCluster(), e)
}
return
}
func (c *cRaft) GetNoOPSession() *dgbcli.Session {
return c.nodeHost.GetNoOPSession(c.config.ClusterID)
}
func (c *cRaft) GetNodeUser() (dgbclt.INodeUser, liberr.Error) {
r, e := c.nodeHost.GetNodeUser(c.config.ClusterID)
if e != nil {
return nil, ErrorNodeUser.ErrorParent(c.getErrorCluster())
}
return r, nil
}
func (c *cRaft) HasNodeInfo(nodeId uint64) bool {
if nodeId == 0 {
nodeId = c.config.NodeID
}
return c.nodeHost.HasNodeInfo(c.config.ClusterID, nodeId)
}
func (c *cRaft) GetNodeHostInfo(opt dgbclt.NodeHostInfoOption) *dgbclt.NodeHostInfo {
return c.nodeHost.GetNodeHostInfo(opt)
}
func (c *cRaft) StaleReadDangerous(query interface{}) (interface{}, error) {
return c.nodeHost.StaleRead(c.config.ClusterID, query)
}
func (c *cRaft) RequestLeaderTransfer(targetNodeID uint64) liberr.Error {
e := c.nodeHost.RequestLeaderTransfer(c.config.ClusterID, targetNodeID)
if e != nil {
return ErrorLeaderTransfer.ErrorParent(c.getErrorCluster(), c.getErrorNodeTarget(targetNodeID), e)
}
return nil
}

83
cluster/config.go Normal file
View File

@@ -0,0 +1,83 @@
/***********************************************************************************************************************
*
* MIT License
*
* Copyright (c) 2021 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
**********************************************************************************************************************/
package cluster
import (
"fmt"
"github.com/go-playground/validator/v10"
dgbclt "github.com/lni/dragonboat/v3"
dgbcfg "github.com/lni/dragonboat/v3/config"
liberr "github.com/nabbar/golib/errors"
)
type Config struct {
Node ConfigNode `mapstructure:"node" json:"node" yaml:"node" toml:"node"`
Cluster ConfigCluster `mapstructure:"cluster" json:"cluster" yaml:"cluster" toml:"cluster"`
InitMember map[uint64]string `mapstructure:"init_member" json:"init_member" yaml:"init_member" toml:"init_member"`
}
func (c Config) GetDGBConfigCluster() dgbcfg.Config {
return c.Cluster.GetDGBConfigCluster()
}
func (c Config) GetDGBConfigNode() dgbcfg.NodeHostConfig {
return c.Node.GetDGBConfigNodeHost()
}
func (c Config) GetInitMember() map[uint64]dgbclt.Target {
var m = make(map[uint64]dgbclt.Target)
for k, v := range c.InitMember {
m[k] = v
}
return m
}
func (c Config) Validate() liberr.Error {
val := validator.New()
err := val.Struct(c)
if e, ok := err.(*validator.InvalidValidationError); ok {
return ErrorValidateConfig.ErrorParent(e)
}
out := ErrorValidateConfig.Error(nil)
for _, e := range err.(validator.ValidationErrors) {
//nolint goerr113
out.AddParent(fmt.Errorf("config field '%s' is not validated by constraint '%s'", e.Field(), e.ActualTag()))
}
if out.HasParent() {
return out
}
return nil
}

261
cluster/configCluster.go Normal file
View File

@@ -0,0 +1,261 @@
/***********************************************************************************************************************
*
* MIT License
*
* Copyright (c) 2021 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
**********************************************************************************************************************/
package cluster
import (
"fmt"
"github.com/go-playground/validator/v10"
dgbcfg "github.com/lni/dragonboat/v3/config"
liberr "github.com/nabbar/golib/errors"
)
//nolint #maligned
type ConfigCluster struct {
// NodeID is a non-zero value used to identify a node within a Raft cluster.
NodeID uint64 `mapstructure:"node_id" json:"node_id" yaml:"node_id" toml:"node_id"`
// ClusterID is the unique value used to identify a Raft cluster.
ClusterID uint64 `mapstructure:"cluster_id" json:"cluster_id" yaml:"cluster_id" toml:"cluster_id"`
// CheckQuorum specifies whether the leader node should periodically check
// non-leader node status and step down to become a follower node when it no
// longer has the quorum.
CheckQuorum bool `mapstructure:"check_quorum" json:"check_quorum" yaml:"check_quorum" toml:"check_quorum"`
// ElectionRTT is the minimum number of message RTT between elections. Message
// RTT is defined by NodeHostConfig.RTTMillisecond. The Raft paper suggests it
// to be a magnitude greater than HeartbeatRTT, which is the interval between
// two heartbeats. In Raft, the actual interval between elections is
// randomized to be between ElectionRTT and 2 * ElectionRTT.
//
// As an example, assuming NodeHostConfig.RTTMillisecond is 100 millisecond,
// to set the election interval to be 1 second, then ElectionRTT should be set
// to 10.
//
// When CheckQuorum is enabled, ElectionRTT also defines the interval for
// checking leader quorum.
ElectionRTT uint64 `mapstructure:"election_rtt" json:"election_rtt" yaml:"election_rtt" toml:"election_rtt"`
// HeartbeatRTT is the number of message RTT between heartbeats. Message
// RTT is defined by NodeHostConfig.RTTMillisecond. The Raft paper suggest the
// heartbeat interval to be close to the average RTT between nodes.
//
// As an example, assuming NodeHostConfig.RTTMillisecond is 100 millisecond,
// to set the heartbeat interval to be every 200 milliseconds, then
// HeartbeatRTT should be set to 2.
HeartbeatRTT uint64 `mapstructure:"heartbeat_rtt" json:"heartbeat_rtt" yaml:"heartbeat_rtt" toml:"heartbeat_rtt"`
// SnapshotEntries defines how often the state machine should be snapshotted
// automcatically. It is defined in terms of the number of applied Raft log
// entries. SnapshotEntries can be set to 0 to disable such automatic
// snapshotting.
//
// When SnapshotEntries is set to N, it means a snapshot is created for
// roughly every N applied Raft log entries (proposals). This also implies
// that sending N log entries to a follower is more expensive than sending a
// snapshot.
//
// Once a snapshot is generated, Raft log entries covered by the new snapshot
// can be compacted. This involves two steps, redundant log entries are first
// marked as deleted, then they are physically removed from the underlying
// storage when a LogDB compaction is issued at a later stage. See the godoc
// on CompactionOverhead for details on what log entries are actually removed
// and compacted after generating a snapshot.
//
// Once automatic snapshotting is disabled by setting the SnapshotEntries
// field to 0, users can still use NodeHost's RequestSnapshot or
// SyncRequestSnapshot methods to manually request snapshots.
SnapshotEntries uint64 `mapstructure:"snapshot_entries" json:"snapshot_entries" yaml:"snapshot_entries" toml:"snapshot_entries"`
// CompactionOverhead defines the number of most recent entries to keep after
// each Raft log compaction. Raft log compaction is performance automatically
// every time when a snapshot is created.
//
// For example, when a snapshot is created at let's say index 10,000, then all
// Raft log entries with index <= 10,000 can be removed from that node as they
// have already been covered by the created snapshot image. This frees up the
// maximum storage space but comes at the cost that the full snapshot will
// have to be sent to the follower if the follower requires any Raft log entry
// at index <= 10,000. When CompactionOverhead is set to say 500, Dragonboat
// then compacts the Raft log up to index 9,500 and keeps Raft log entries
// between index (9,500, 1,0000]. As a result, the node can still replicate
// Raft log entries between index (9,500, 1,0000] to other peers and only fall
// back to stream the full snapshot if any Raft log entry with index <= 9,500
// is required to be replicated.
CompactionOverhead uint64 `mapstructure:"compaction_overhead" json:"compaction_overhead" yaml:"compaction_overhead" toml:"compaction_overhead"`
// OrderedConfigChange determines whether Raft membership change is enforced
// with ordered config change ID.
OrderedConfigChange bool `mapstructure:"ordered_config_change" json:"ordered_config_change" yaml:"ordered_config_change" toml:"ordered_config_change"`
// MaxInMemLogSize is the target size in bytes allowed for storing in memory
// Raft logs on each Raft node. In memory Raft logs are the ones that have
// not been applied yet.
// MaxInMemLogSize is a target value implemented to prevent unbounded memory
// growth, it is not for precisely limiting the exact memory usage.
// When MaxInMemLogSize is 0, the target is set to math.MaxUint64. When
// MaxInMemLogSize is set and the target is reached, error will be returned
// when clients try to make new proposals.
// MaxInMemLogSize is recommended to be significantly larger than the biggest
// proposal you are going to use.
MaxInMemLogSize uint64 `mapstructure:"max_in_mem_log_size" json:"max_in_mem_log_size" yaml:"max_in_mem_log_size" toml:"max_in_mem_log_size"`
// SnapshotCompressionType is the compression type to use for compressing
// generated snapshot data. No compression is used by default.
SnapshotCompressionType dgbcfg.CompressionType `mapstructure:"snapshot_compression" json:"snapshot_compression" yaml:"snapshot_compression" toml:"snapshot_compression"`
// EntryCompressionType is the compression type to use for compressing the
// payload of user proposals. When Snappy is used, the maximum proposal
// payload allowed is roughly limited to 3.42GBytes. No compression is used
// by default.
EntryCompressionType dgbcfg.CompressionType `mapstructure:"entry_compression" json:"entry_compression" yaml:"entry_compression" toml:"entry_compression"`
// DisableAutoCompactions disables auto compaction used for reclaiming Raft
// log entry storage spaces. By default, compaction request is issued every
// time when a snapshot is created, this helps to reclaim disk spaces as
// soon as possible at the cost of immediate higher IO overhead. Users can
// disable such auto compactions and use NodeHost.RequestCompaction to
// manually request such compactions when necessary.
DisableAutoCompactions bool `mapstructure:"disable_auto_compactions" json:"disable_auto_compactions" yaml:"disable_auto_compactions" toml:"disable_auto_compactions"`
// IsObserver indicates whether this is an observer Raft node without voting
// power. Described as non-voting members in the section 4.2.1 of Diego
// Ongaro's thesis, observer nodes are usually used to allow a new node to
// join the cluster and catch up with other existing ndoes without impacting
// the availability. Extra observer nodes can also be introduced to serve
// read-only requests without affecting system write throughput.
//
// Observer support is currently experimental.
IsObserver bool `mapstructure:"is_observer" json:"is_observer" yaml:"is_observer" toml:"is_observer"`
// IsWitness indicates whether this is a witness Raft node without actual log
// replication and do not have state machine. It is mentioned in the section
// 11.7.2 of Diego Ongaro's thesis.
//
// Witness support is currently experimental.
IsWitness bool `mapstructure:"is_witness" json:"is_witness" yaml:"is_witness" toml:"is_witness"`
// Quiesce specifies whether to let the Raft cluster enter quiesce mode when
// there is no cluster activity. Clusters in quiesce mode do not exchange
// heartbeat messages to minimize bandwidth consumption.
//
// Quiesce support is currently experimental.
Quiesce bool `mapstructure:"quiesce" json:"quiesce" yaml:"quiesce" toml:"quiesce"`
}
func (c ConfigCluster) GetDGBConfigCluster() dgbcfg.Config {
d := dgbcfg.Config{
NodeID: c.NodeID,
ClusterID: c.ClusterID,
SnapshotCompressionType: 0,
EntryCompressionType: 0,
}
if c.CheckQuorum {
d.CheckQuorum = true
}
if c.ElectionRTT > 0 {
d.ElectionRTT = c.ElectionRTT
}
if c.HeartbeatRTT > 0 {
d.HeartbeatRTT = c.HeartbeatRTT
}
if c.SnapshotEntries > 0 {
d.SnapshotEntries = c.SnapshotEntries
}
if c.CompactionOverhead > 0 {
d.CompactionOverhead = c.CompactionOverhead
}
if c.OrderedConfigChange {
d.OrderedConfigChange = true
}
if c.MaxInMemLogSize > 0 {
d.MaxInMemLogSize = c.MaxInMemLogSize
}
if c.DisableAutoCompactions {
d.DisableAutoCompactions = true
}
if c.IsObserver {
d.IsObserver = true
}
if c.IsWitness {
d.IsWitness = true
}
if c.Quiesce {
d.Quiesce = true
}
switch c.SnapshotCompressionType {
case dgbcfg.Snappy:
d.SnapshotCompressionType = dgbcfg.Snappy
default:
d.SnapshotCompressionType = dgbcfg.NoCompression
}
switch c.EntryCompressionType {
case dgbcfg.Snappy:
d.EntryCompressionType = dgbcfg.Snappy
default:
d.EntryCompressionType = dgbcfg.NoCompression
}
return d
}
func (c ConfigCluster) Validate() liberr.Error {
val := validator.New()
err := val.Struct(c)
if e, ok := err.(*validator.InvalidValidationError); ok {
return ErrorValidateCluster.ErrorParent(e)
}
out := ErrorValidateCluster.Error(nil)
for _, e := range err.(validator.ValidationErrors) {
//nolint goerr113
out.AddParent(fmt.Errorf("config field '%s' is not validated by constraint '%s'", e.Field(), e.ActualTag()))
}
if out.HasParent() {
return out
}
return nil
}

107
cluster/configEngine.go Normal file
View File

@@ -0,0 +1,107 @@
/***********************************************************************************************************************
*
* MIT License
*
* Copyright (c) 2021 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
**********************************************************************************************************************/
package cluster
import (
"fmt"
"github.com/go-playground/validator/v10"
dgbcfg "github.com/lni/dragonboat/v3/config"
liberr "github.com/nabbar/golib/errors"
)
type ConfigEngine struct {
// ExecShards is the number of execution shards in the first stage of the
// execution engine. Default value is 16. Once deployed, this value can not
// be changed later.
ExecShards uint64 `mapstructure:"exec_shards" json:"exec_shards" yaml:"exec_shards" toml:"exec_shards"`
// CommitShards is the number of commit shards in the second stage of the
// execution engine. Default value is 16.
CommitShards uint64 `mapstructure:"commit_shards" json:"commit_shards" yaml:"commit_shards" toml:"commit_shards"`
// ApplyShards is the number of apply shards in the third stage of the
// execution engine. Default value is 16.
ApplyShards uint64 `mapstructure:"apply_shards" json:"apply_shards" yaml:"apply_shards" toml:"apply_shards"`
// SnapshotShards is the number of snapshot shards in the forth stage of the
// execution engine. Default value is 48.
SnapshotShards uint64 `mapstructure:"snapshot_shards" json:"snapshot_shards" yaml:"snapshot_shards" toml:"snapshot_shards"`
// CloseShards is the number of close shards used for closing stopped
// state machines. Default value is 32.
CloseShards uint64 `mapstructure:"close_shards" json:"close_shards" yaml:"close_shards" toml:"close_shards"`
}
func (c ConfigEngine) GetDGBConfigEngine() dgbcfg.EngineConfig {
d := dgbcfg.EngineConfig{}
if c.ExecShards > 0 {
d.ExecShards = c.ExecShards
}
if c.CommitShards > 0 {
d.CommitShards = c.CommitShards
}
if c.ApplyShards > 0 {
d.ApplyShards = c.ApplyShards
}
if c.SnapshotShards > 0 {
d.SnapshotShards = c.SnapshotShards
}
if c.CloseShards > 0 {
d.CloseShards = c.CloseShards
}
return d
}
func (c ConfigEngine) Validate() liberr.Error {
val := validator.New()
err := val.Struct(c)
if e, ok := err.(*validator.InvalidValidationError); ok {
return ErrorValidateEngine.ErrorParent(e)
}
out := ErrorValidateEngine.Error(nil)
for _, e := range err.(validator.ValidationErrors) {
//nolint goerr113
out.AddParent(fmt.Errorf("config field '%s' is not validated by constraint '%s'", e.Field(), e.ActualTag()))
}
if out.HasParent() {
return out
}
return nil
}

88
cluster/configExpert.go Normal file
View File

@@ -0,0 +1,88 @@
/***********************************************************************************************************************
*
* MIT License
*
* Copyright (c) 2021 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
**********************************************************************************************************************/
package cluster
import (
"fmt"
"time"
"github.com/go-playground/validator/v10"
dgbcfg "github.com/lni/dragonboat/v3/config"
liberr "github.com/nabbar/golib/errors"
)
type ConfigExpert struct {
// Engine is the cponfiguration for the execution engine.
Engine ConfigEngine `mapstructure:"engine" json:"engine" yaml:"engine" toml:"engine"`
// TestNodeHostID is the NodeHostID value to be used by the NodeHost instance.
// This field is expected to be used in tests only.
TestNodeHostID uint64 `mapstructure:"test_node_host_id" json:"test_node_host_id" yaml:"test_node_host_id" toml:"test_node_host_id"`
// TestGossipProbeInterval define the probe interval used by the gossip
// service in tests.
TestGossipProbeInterval time.Duration `mapstructure:"test_gossip_probe_interval" json:"test_gossip_probe_interval" yaml:"test_gossip_probe_interval" toml:"test_gossip_probe_interval"`
}
func (c ConfigExpert) GetDGBConfigExpert() dgbcfg.ExpertConfig {
d := dgbcfg.ExpertConfig{
Engine: c.Engine.GetDGBConfigEngine(),
}
if c.TestNodeHostID > 0 {
d.TestNodeHostID = c.TestNodeHostID
}
if c.TestGossipProbeInterval > 0 {
d.TestGossipProbeInterval = c.TestGossipProbeInterval
}
return d
}
func (c ConfigExpert) Validate() liberr.Error {
val := validator.New()
err := val.Struct(c)
if e, ok := err.(*validator.InvalidValidationError); ok {
return ErrorValidateExpert.ErrorParent(e)
}
out := ErrorValidateExpert.Error(nil)
for _, e := range err.(validator.ValidationErrors) {
//nolint goerr113
out.AddParent(fmt.Errorf("config field '%s' is not validated by constraint '%s'", e.Field(), e.ActualTag()))
}
if out.HasParent() {
return out
}
return nil
}

112
cluster/configGossip.go Normal file
View File

@@ -0,0 +1,112 @@
/***********************************************************************************************************************
*
* MIT License
*
* Copyright (c) 2021 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
**********************************************************************************************************************/
package cluster
import (
"fmt"
"github.com/go-playground/validator/v10"
dgbcfg "github.com/lni/dragonboat/v3/config"
liberr "github.com/nabbar/golib/errors"
)
type ConfigGossip struct {
// BindAddress is the address for the gossip service to bind to and listen on.
// Both UDP and TCP ports are used by the gossip service. The local gossip
// service should be able to receive gossip service related messages by
// binding to and listening on this address. BindAddress is usually in the
// format of IP:Port, Hostname:Port or DNS Name:Port.
BindAddress string `mapstructure:"bind_address" json:"bind_address" yaml:"bind_address" toml:"bind_address"`
// AdvertiseAddress is the address to advertise to other NodeHost instances
// used for NAT traversal. Gossip services running on remote NodeHost
// instances will use AdvertiseAddress to exchange gossip service related
// messages. AdvertiseAddress is in the format of IP:Port.
AdvertiseAddress string `mapstructure:"advertise_address" json:"advertise_address" yaml:"advertise_address" toml:"advertise_address"`
// Seed is a list of AdvertiseAddress of remote NodeHost instances. Local
// NodeHost instance will try to contact all of them to bootstrap the gossip
// service. At least one reachable NodeHost instance is required to
// successfully bootstrap the gossip service. Each seed address is in the
// format of IP:Port, Hostname:Port or DNS Name:Port.
//
// It is ok to include seed addresses that are temporarily unreachable, e.g.
// when launching the first NodeHost instance in your deployment, you can
// include AdvertiseAddresses from other NodeHost instances that you plan to
// launch shortly afterwards.
Seed []string `mapstructure:"seed" json:"seed" yaml:"seed" toml:"seed"`
}
func (c ConfigGossip) GetDGBConfigGossip() dgbcfg.GossipConfig {
d := dgbcfg.GossipConfig{}
if c.BindAddress != "" {
d.BindAddress = c.BindAddress
}
if c.AdvertiseAddress != "" {
d.AdvertiseAddress = c.AdvertiseAddress
}
if len(c.Seed) > 0 {
d.Seed = make([]string, 0)
for _, v := range c.Seed {
if v == "" {
continue
}
d.Seed = append(d.Seed, v)
}
}
return d
}
func (c ConfigGossip) Validate() liberr.Error {
val := validator.New()
err := val.Struct(c)
if e, ok := err.(*validator.InvalidValidationError); ok {
return ErrorValidateGossip.ErrorParent(e)
}
out := ErrorValidateGossip.Error(nil)
for _, e := range err.(validator.ValidationErrors) {
//nolint goerr113
out.AddParent(fmt.Errorf("config field '%s' is not validated by constraint '%s'", e.Field(), e.ActualTag()))
}
if out.HasParent() {
return out
}
return nil
}

337
cluster/configNode.go Normal file
View File

@@ -0,0 +1,337 @@
/***********************************************************************************************************************
*
* MIT License
*
* Copyright (c) 2021 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
**********************************************************************************************************************/
package cluster
import (
"fmt"
"github.com/go-playground/validator/v10"
dgbcfg "github.com/lni/dragonboat/v3/config"
liberr "github.com/nabbar/golib/errors"
)
//nolint #maligned
type ConfigNode struct {
// DeploymentID is used to determine whether two NodeHost instances belong to
// the same deployment and thus allowed to communicate with each other. This
// helps to prvent accidentially misconfigured NodeHost instances to cause
// data corruption errors by sending out of context messages to unrelated
// Raft nodes.
// For a particular dragonboat based application, you can set DeploymentID
// to the same uint64 value on all production NodeHost instances, then use
// different DeploymentID values on your staging and dev environment. It is
// also recommended to use different DeploymentID values for different
// dragonboat based applications.
// When not set, the default value 0 will be used as the deployment ID and
// thus allowing all NodeHost instances with deployment ID 0 to communicate
// with each other.
DeploymentID uint64 `mapstructure:"deployment_id" json:"deployment_id" yaml:"deployment_id" toml:"deployment_id"`
// WALDir is the directory used for storing the WAL of Raft entries. It is
// recommended to use low latency storage such as NVME SSD with power loss
// protection to store such WAL data. Leave WALDir to have zero value will
// have everything stored in NodeHostDir.
WALDir string `mapstructure:"wal_dir" json:"wal_dir" yaml:"wal_dir" toml:"wal_dir"`
// NodeHostDir is where everything else is stored.
NodeHostDir string `mapstructure:"node_host_dir" json:"node_host_dir" yaml:"node_host_dir" toml:"node_host_dir"`
//nolint #godox
// RTTMillisecond defines the average Rround Trip Time (RTT) in milliseconds
// between two NodeHost instances. Such a RTT interval is internally used as
// a logical clock tick, Raft heartbeat and election intervals are both
// defined in term of how many such logical clock ticks (RTT intervals).
// Note that RTTMillisecond is the combined delays between two NodeHost
// instances including all delays caused by network transmission, delays
// caused by NodeHost queuing and processing. As an example, when fully
// loaded, the average Rround Trip Time between two of our NodeHost instances
// used for benchmarking purposes is up to 500 microseconds when the ping time
// between them is 100 microseconds. Set RTTMillisecond to 1 when it is less
// than 1 million in your environment.
RTTMillisecond uint64 `mapstructure:"rtt_millisecond" json:"rtt_millisecond" yaml:"rtt_millisecond" toml:"rtt_millisecond"`
// RaftAddress is a DNS name:port or IP:port address used by the transport
// module for exchanging Raft messages, snapshots and metadata between
// NodeHost instances. It should be set to the public address that can be
// accessed from remote NodeHost instances.
//
// When the NodeHostConfig.ListenAddress field is empty, NodeHost listens on
// RaftAddress for incoming Raft messages. When hostname or domain name is
// used, it will be resolved to IPv4 addresses first and Dragonboat listens
// to all resolved IPv4 addresses.
//
// By default, the RaftAddress value is not allowed to change between NodeHost
// restarts. AddressByNodeHostID should be set to true when the RaftAddress
// value might change after restart.
RaftAddress string `mapstructure:"raft_address" json:"raft_address" yaml:"raft_address" toml:"raft_address"`
//nolint #godox
// AddressByNodeHostID indicates that NodeHost instances should be addressed
// by their NodeHostID values. This feature is usually used when only dynamic
// addresses are available. When enabled, NodeHostID values should be used
// as the target parameter when calling NodeHost's StartCluster,
// RequestAddNode, RequestAddObserver and RequestAddWitness methods.
//
// Enabling AddressByNodeHostID also enables the internal gossip service,
// NodeHostConfig.Gossip must be configured to control the behaviors of the
// gossip service.
//
// Note that once enabled, the AddressByNodeHostID setting can not be later
// disabled after restarts.
//
// Please see the godocs of the NodeHostConfig.Gossip field for a detailed
// example on how AddressByNodeHostID and gossip works.
AddressByNodeHostID bool `mapstructure:"address_by_node_host_id" json:"address_by_node_host_id" yaml:"address_by_node_host_id" toml:"address_by_node_host_id"`
// ListenAddress is an optional field in the hostname:port or IP:port address
// form used by the transport module to listen on for Raft message and
// snapshots. When the ListenAddress field is not set, The transport module
// listens on RaftAddress. If 0.0.0.0 is specified as the IP of the
// ListenAddress, Dragonboat listens to the specified port on all network
// interfaces. When hostname or domain name is used, it will be resolved to
// IPv4 addresses first and Dragonboat listens to all resolved IPv4 addresses.
ListenAddress string `mapstructure:"listen_address" json:"listen_address" yaml:"listen_address" toml:"listen_address"`
// MutualTLS defines whether to use mutual TLS for authenticating servers
// and clients. Insecure communication is used when MutualTLS is set to
// False.
// See https://github.com/lni/dragonboat/wiki/TLS-in-Dragonboat for more
// details on how to use Mutual TLS.
MutualTLS bool `mapstructure:"mutual_tls" json:"mutual_tls" yaml:"tls" toml:"tls"`
// CAFile is the path of the CA certificate file. This field is ignored when
// MutualTLS is false.
CAFile string `mapstructure:"ca_file" json:"ca_file" yaml:"ca_file" toml:"ca_file"`
// CertFile is the path of the node certificate file. This field is ignored
// when MutualTLS is false.
CertFile string `mapstructure:"cert_file" json:"cert_file" yaml:"cert_file" toml:"cert_file"`
// KeyFile is the path of the node key file. This field is ignored when
// MutualTLS is false.
KeyFile string `mapstructure:"key_file" json:"key_file" yaml:"key_file" toml:"key_file"`
// EnableMetrics determines whether health metrics in Prometheus format should
// be enabled.
EnableMetrics bool `mapstructure:"enable_metrics" json:"enable_metrics" yaml:"enable_metrics" toml:"enable_metrics"`
// MaxSendQueueSize is the maximum size in bytes of each send queue.
// Once the maximum size is reached, further replication messages will be
// dropped to restrict memory usage. When set to 0, it means the send queue
// size is unlimited.
MaxSendQueueSize uint64 `mapstructure:"max_send_queue_size" json:"max_send_queue_size" yaml:"max_send_queue_size" toml:"max_send_queue_size"`
// MaxReceiveQueueSize is the maximum size in bytes of each receive queue.
// Once the maximum size is reached, further replication messages will be
// dropped to restrict memory usage. When set to 0, it means the queue size
// is unlimited.
MaxReceiveQueueSize uint64 `mapstructure:"max_receive_queue_size" json:"max_receive_queue_size" yaml:"max_receive_queue_size" toml:"max_receive_queue_size"`
// MaxSnapshotSendBytesPerSecond defines how much snapshot data can be sent
// every second for all Raft clusters managed by the NodeHost instance.
// The default value 0 means there is no limit set for snapshot streaming.
MaxSnapshotSendBytesPerSecond uint64 `mapstructure:"max_snapshot_send_bytes_per_second" json:"max_snapshot_send_bytes_per_second" yaml:"max_snapshot_send_bytes_per_second" toml:"max_snapshot_send_bytes_per_second"`
// MaxSnapshotRecvBytesPerSecond defines how much snapshot data can be
// received each second for all Raft clusters managed by the NodeHost instance.
// The default value 0 means there is no limit for receiving snapshot data.
MaxSnapshotRecvBytesPerSecond uint64 `mapstructure:"max_snapshot_recv_bytes_per_second" json:"max_snapshot_recv_bytes_per_second" yaml:"max_snapshot_recv_bytes_per_second" toml:"max_snapshot_recv_bytes_per_second"`
// NotifyCommit specifies whether clients should be notified when their
// regular proposals and config change requests are committed. By default,
// commits are not notified, clients are only notified when their proposals
// are both committed and applied.
NotifyCommit bool `mapstructure:"notify_commit" json:"notify_commit" yaml:"notify_commit" toml:"notify_commit"`
// Gossip contains configurations for the gossip service. When the
// AddressByNodeHostID field is set to true, each NodeHost instance will use
// an internal gossip service to exchange knowledges of known NodeHost
// instances including their RaftAddress and NodeHostID values. This Gossip
// field contains configurations that controls how the gossip service works.
//
// As an detailed example on how to use the gossip service in the situation
// where all available machines have dynamically assigned IPs on reboot -
//
// Consider that there are three NodeHost instances on three machines, each
// of them has a dynamically assigned IP address which will change on reboot.
// NodeHostConfig.RaftAddress should be set to the current address that can be
// reached by remote NodeHost instance. In this example, we will assume they
// are
//
// 10.0.0.100:24000
// 10.0.0.200:24000
// 10.0.0.300:24000
//
// To use these machines, first enable the NodeHostConfig.AddressByNodeHostID
// field and start the NodeHost instances. The NodeHostID value of each
// NodeHost instance can be obtained by calling NodeHost.ID(). Let's say they
// are
//
// "nhid-xxxxx",
// "nhid-yyyyy",
// "nhid-zzzzz".
//
// All these NodeHostID are fixed, they will never change after reboots.
//
// When starting Raft nodes or requesting new nodes to be added, use the above
// mentioned NodeHostID values as the target parameters (which are of the
// Target type). Let's say we want to start a Raft Node as a part of a three
// replicas Raft cluster, the initialMembers parameter of the StartCluster
// method can be set to
//
// initialMembers := map[uint64]Target {
// 1: "nhid-xxxxx",
// 2: "nhid-yyyyy",
// 3: "nhid-zzzzz",
// }
//
// This indicates that node 1 of the cluster will be running on the NodeHost
// instance identified by the NodeHostID value "nhid-xxxxx", node 2 of the
// same cluster will be running on the NodeHost instance identified by the
// NodeHostID value of "nhid-yyyyy" and so on.
//
// The internal gossip service exchanges NodeHost details, including their
// NodeHostID and RaftAddress values, with all other known NodeHost instances.
// Thanks to the nature of gossip, it will eventually allow each NodeHost
// instance to be aware of the current details of all NodeHost instances.
// As a result, let's say when Raft node 1 wants to send a Raft message to
// node 2, it first figures out that node 2 is running on the NodeHost
// identified by the NodeHostID value "nhid-yyyyy", RaftAddress information
// from the gossip service further shows that "nhid-yyyyy" maps to a machine
// currently reachable at 10.0.0.200:24000. Raft messages can thus be
// delivered.
//
// The Gossip field here is used to configure how the gossip service works.
// In this example, let's say we choose to use the following configurations
// for those three NodeHost instaces.
//
// GossipConfig {
// BindAddress: "10.0.0.100:24001",
// Seed: []string{10.0.0.200:24001},
// }
//
// GossipConfig {
// BindAddress: "10.0.0.200:24001",
// Seed: []string{10.0.0.300:24001},
// }
//
// GossipConfig {
// BindAddress: "10.0.0.300:24001",
// Seed: []string{10.0.0.100:24001},
// }
//
// For those three machines, the gossip component listens on
// "10.0.0.100:24001", "10.0.0.200:24001" and "10.0.0.300:24001" respectively
// for incoming gossip messages. The Seed field is a list of known gossip end
// points the local gossip service will try to talk to. The Seed field doesn't
// need to include all gossip end points, a few well connected nodes in the
// gossip network is enough.
Gossip ConfigGossip `mapstructure:"gossip" json:"gossip" yaml:"gossip" toml:"gossip"`
// Expert contains options for expert users who are familiar with the internals
// of Dragonboat. Users are recommended not to use this field unless
// absoloutely necessary. It is important to note that any change to this field
// may cause an existing instance unable to restart, it may also cause negative
// performance impacts.
Expert ConfigExpert `mapstructure:"expert" json:"expert" yaml:"expert" toml:"expert"`
}
func (c ConfigNode) GetDGBConfigNodeHost() dgbcfg.NodeHostConfig {
d := dgbcfg.NodeHostConfig{
DeploymentID: c.DeploymentID,
WALDir: c.WALDir,
NodeHostDir: c.NodeHostDir,
RaftAddress: c.RaftAddress,
ListenAddress: c.ListenAddress,
Gossip: c.Gossip.GetDGBConfigGossip(),
Expert: c.Expert.GetDGBConfigExpert(),
}
if c.RTTMillisecond > 0 {
d.RTTMillisecond = c.RTTMillisecond
}
if c.AddressByNodeHostID {
d.AddressByNodeHostID = true
}
if c.MutualTLS && c.CAFile != "" && c.CertFile != "" && c.KeyFile != "" {
d.MutualTLS = true
d.CAFile = c.CAFile
d.CertFile = c.CertFile
d.KeyFile = c.KeyFile
}
if c.EnableMetrics {
d.EnableMetrics = true
}
if c.MaxSendQueueSize > 0 {
d.MaxSendQueueSize = c.MaxSendQueueSize
}
if c.MaxReceiveQueueSize > 0 {
d.MaxReceiveQueueSize = c.MaxReceiveQueueSize
}
if c.MaxSnapshotSendBytesPerSecond > 0 {
d.MaxSnapshotSendBytesPerSecond = c.MaxSnapshotSendBytesPerSecond
}
if c.MaxSnapshotRecvBytesPerSecond > 0 {
d.MaxSnapshotRecvBytesPerSecond = c.MaxSnapshotRecvBytesPerSecond
}
if c.NotifyCommit {
d.NotifyCommit = true
}
return d
}
func (c ConfigNode) Validate() liberr.Error {
val := validator.New()
err := val.Struct(c)
if e, ok := err.(*validator.InvalidValidationError); ok {
return ErrorValidateNode.ErrorParent(e)
}
out := ErrorValidateNode.Error(nil)
for _, e := range err.(validator.ValidationErrors) {
//nolint goerr113
out.AddParent(fmt.Errorf("config field '%s' is not validated by constraint '%s'", e.Field(), e.ActualTag()))
}
if out.HasParent() {
return out
}
return nil
}

113
cluster/errors.go Normal file
View File

@@ -0,0 +1,113 @@
/***********************************************************************************************************************
*
* MIT License
*
* Copyright (c) 2021 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
**********************************************************************************************************************/
package cluster
import liberr "github.com/nabbar/golib/errors"
const (
ErrorParamsEmpty liberr.CodeError = iota + liberr.MinPkgCluster
ErrorParamsMissing
ErrorParamsMismatching
ErrorLeader
ErrorLeaderTransfer
ErrorNodeUser
ErrorNodeHostNew
ErrorNodeHostStart
ErrorNodeHostJoin
ErrorNodeHostStop
ErrorNodeHostRestart
ErrorCommandSync
ErrorCommandASync
ErrorCommandLocal
ErrorValidateConfig
ErrorValidateCluster
ErrorValidateNode
ErrorValidateGossip
ErrorValidateExpert
ErrorValidateEngine
)
var isCodeError = false
func IsCodeError() bool {
return isCodeError
}
func init() {
isCodeError = liberr.ExistInMapMessage(ErrorParamsEmpty)
liberr.RegisterIdFctMessage(ErrorParamsEmpty, getMessage)
}
func getMessage(code liberr.CodeError) (message string) {
switch code {
case liberr.UNK_ERROR:
return ""
case ErrorParamsEmpty:
return "at least on given parameters is empty"
case ErrorParamsMissing:
return "at least on given parameters is missing"
case ErrorParamsMismatching:
return "at least on given parameters is mismatching awaiting type"
case ErrorLeader:
return "enable to retrieve cluster leader"
case ErrorLeaderTransfer:
return "enable to transfer cluster leader"
case ErrorNodeUser:
return "enable to retrieve node user"
case ErrorNodeHostNew:
return "enable to init new NodeHost"
case ErrorNodeHostStart:
return "enable to start cluster"
case ErrorNodeHostJoin:
return "enable to join cluster"
case ErrorNodeHostStop:
return "enable to stop cluster or node"
case ErrorNodeHostRestart:
return "enable to restart node properly"
case ErrorCommandSync:
return "enable to call synchronous command"
case ErrorCommandASync:
return "enable to call asynchronous command"
case ErrorCommandLocal:
return "enable to call local command"
case ErrorValidateConfig:
return "config seems to be not valid"
case ErrorValidateCluster:
return "cluster config seems to be not valid"
case ErrorValidateNode:
return "node config seems to be not valid"
case ErrorValidateGossip:
return "gossip config seems to be not valid"
case ErrorValidateExpert:
return "expert config seems to be not valid"
case ErrorValidateEngine:
return "engine config seems to be not valid"
}
return ""
}

121
cluster/interface.go Normal file
View File

@@ -0,0 +1,121 @@
/***********************************************************************************************************************
*
* MIT License
*
* Copyright (c) 2021 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
**********************************************************************************************************************/
package cluster
import (
"context"
"time"
dgbclt "github.com/lni/dragonboat/v3"
dgbcli "github.com/lni/dragonboat/v3/client"
dgbcfg "github.com/lni/dragonboat/v3/config"
dgbstm "github.com/lni/dragonboat/v3/statemachine"
liberr "github.com/nabbar/golib/errors"
)
const (
_DefaultTimeoutCommandSync = 10 * time.Second
_DefaultTimeoutCommandAsync = 100 * time.Second
)
type Cluster interface {
GetConfig() dgbcfg.Config
SetConfig(cfg dgbcfg.Config)
GetNodeHostConfig() dgbcfg.NodeHostConfig
GetFctCreate() dgbstm.CreateStateMachineFunc
GetFctCreateConcurrent() dgbstm.CreateConcurrentStateMachineFunc
GetFctCreateOnDisk() dgbstm.CreateOnDiskStateMachineFunc
SetFctCreate(fctCreate interface{})
SetFctCreateSTM(fctCreate dgbstm.CreateStateMachineFunc)
SetFctCreateSTMConcurrent(fctCreate dgbstm.CreateConcurrentStateMachineFunc)
SetFctCreateSTMOnDisk(fctCreate dgbstm.CreateOnDiskStateMachineFunc)
GetMemberInit() map[uint64]dgbclt.Target
SetMemberInit(memberList map[uint64]dgbclt.Target)
SetTimeoutCommandSync(timeout time.Duration)
SetTimeoutCommandASync(timeout time.Duration)
HasNodeInfo(nodeId uint64) bool
RaftAddress() string
ID() string
ClusterStart(join bool) liberr.Error
ClusterStop(force bool) liberr.Error
ClusterRestart(force bool) liberr.Error
NodeStop(target uint64) liberr.Error
NodeRestart(force bool) liberr.Error
GetLeaderID() (leader uint64, valid bool, err liberr.Error)
GetNoOPSession() *dgbcli.Session
GetNodeHostInfo(opt dgbclt.NodeHostInfoOption) *dgbclt.NodeHostInfo
RequestLeaderTransfer(targetNodeID uint64) liberr.Error
StaleReadDangerous(query interface{}) (interface{}, error)
SyncPropose(parent context.Context, session *dgbcli.Session, cmd []byte) (dgbstm.Result, liberr.Error)
SyncRead(parent context.Context, query interface{}) (interface{}, liberr.Error)
SyncGetClusterMembership(parent context.Context) (*dgbclt.Membership, liberr.Error)
SyncGetSession(parent context.Context) (*dgbcli.Session, liberr.Error)
SyncCloseSession(parent context.Context, cs *dgbcli.Session) liberr.Error
SyncRequestSnapshot(parent context.Context, opt dgbclt.SnapshotOption) (uint64, liberr.Error)
SyncRequestDeleteNode(parent context.Context, nodeID uint64, configChangeIndex uint64) liberr.Error
SyncRequestAddNode(parent context.Context, nodeID uint64, target string, configChangeIndex uint64) liberr.Error
SyncRequestAddObserver(parent context.Context, nodeID uint64, target string, configChangeIndex uint64) liberr.Error
SyncRequestAddWitness(parent context.Context, nodeID uint64, target string, configChangeIndex uint64) liberr.Error
SyncRemoveData(parent context.Context, nodeID uint64) liberr.Error
AsyncPropose(session *dgbcli.Session, cmd []byte) (*dgbclt.RequestState, liberr.Error)
AsyncProposeSession(session *dgbcli.Session) (*dgbclt.RequestState, liberr.Error)
AsyncReadIndex() (*dgbclt.RequestState, liberr.Error)
AsyncRequestCompaction(nodeID uint64) (*dgbclt.SysOpState, liberr.Error)
LocalReadNode(rs *dgbclt.RequestState, query interface{}) (interface{}, liberr.Error)
LocalNAReadNode(rs *dgbclt.RequestState, query []byte) ([]byte, liberr.Error)
}
func NewCluster(cfg Config, fctCreate interface{}) (Cluster, liberr.Error) {
c := &cRaft{
memberInit: cfg.GetInitMember(),
fctCreate: fctCreate,
config: cfg.GetDGBConfigCluster(),
nodeHost: nil,
timeoutCmdSync: _DefaultTimeoutCommandSync,
timeoutCmdASync: _DefaultTimeoutCommandAsync,
}
if n, e := dgbclt.NewNodeHost(cfg.GetDGBConfigNode()); e != nil {
return nil, ErrorNodeHostNew.ErrorParent(e)
} else {
c.nodeHost = n
}
return c, nil
}

53
cluster/local.go Normal file
View File

@@ -0,0 +1,53 @@
/***********************************************************************************************************************
*
* MIT License
*
* Copyright (c) 2021 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
**********************************************************************************************************************/
package cluster
import (
dgbclt "github.com/lni/dragonboat/v3"
liberr "github.com/nabbar/golib/errors"
)
func (c *cRaft) LocalReadNode(rs *dgbclt.RequestState, query interface{}) (interface{}, liberr.Error) {
i, e := c.nodeHost.ReadLocalNode(rs, query)
if e != nil {
return i, ErrorCommandLocal.ErrorParent(c.getErrorCommand("ReadNode"), e)
}
return i, nil
}
func (c *cRaft) LocalNAReadNode(rs *dgbclt.RequestState, query []byte) ([]byte, liberr.Error) {
r, e := c.nodeHost.NAReadLocalNode(rs, query)
if e != nil {
return r, ErrorCommandLocal.ErrorParent(c.getErrorCommand("ReadNode"), e)
}
return r, nil
}

74
cluster/logger.go Normal file
View File

@@ -0,0 +1,74 @@
/***********************************************************************************************************************
*
* MIT License
*
* Copyright (c) 2021 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
**********************************************************************************************************************/
package cluster
import (
dgblog "github.com/lni/dragonboat/v3/logger"
liblog "github.com/nabbar/golib/logger"
)
func init() {
dgblog.SetLoggerFactory(func(pkgName string) dgblog.ILogger {
return &logDragonBoart{
pkg: pkgName,
}
})
}
type logDragonBoart struct {
pkg string
}
func (l *logDragonBoart) SetLevel(level dgblog.LogLevel) {
}
func (l *logDragonBoart) Debugf(format string, args ...interface{}) {
var newArg = append(make([]interface{}, 0), l.pkg)
liblog.DebugLevel.Logf("[DragonBoat: %s]"+format, append(newArg, args...)...)
}
func (l *logDragonBoart) Infof(format string, args ...interface{}) {
var newArg = append(make([]interface{}, 0), l.pkg)
liblog.InfoLevel.Logf("[DragonBoat: %s]"+format, append(newArg, args...)...)
}
func (l *logDragonBoart) Warningf(format string, args ...interface{}) {
var newArg = append(make([]interface{}, 0), l.pkg)
liblog.WarnLevel.Logf("[DragonBoat: %s]"+format, append(newArg, args...)...)
}
func (l *logDragonBoart) Errorf(format string, args ...interface{}) {
var newArg = append(make([]interface{}, 0), l.pkg)
liblog.ErrorLevel.Logf("[DragonBoat: %s]"+format, append(newArg, args...)...)
}
func (l *logDragonBoart) Panicf(format string, args ...interface{}) {
var newArg = append(make([]interface{}, 0), l.pkg)
liblog.FatalLevel.Logf("[DragonBoat: %s]"+format, append(newArg, args...)...)
}

245
cluster/sync.go Normal file
View File

@@ -0,0 +1,245 @@
/***********************************************************************************************************************
*
* MIT License
*
* Copyright (c) 2021 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
**********************************************************************************************************************/
package cluster
import (
"context"
"time"
dgbclt "github.com/lni/dragonboat/v3"
dgbcli "github.com/lni/dragonboat/v3/client"
dgbstm "github.com/lni/dragonboat/v3/statemachine"
liberr "github.com/nabbar/golib/errors"
)
func (c *cRaft) syncCtxTimeout(parent context.Context) (context.Context, context.CancelFunc) {
var (
ctx context.Context
cnl context.CancelFunc
)
if parent != nil {
ctx, cnl = context.WithDeadline(parent, time.Now().Add(c.timeoutCmdSync))
} else {
ctx, cnl = context.WithDeadline(context.Background(), time.Now().Add(c.timeoutCmdSync))
}
return ctx, cnl
}
func (c *cRaft) syncCtxCancel(cancel context.CancelFunc) {
if cancel != nil {
cancel()
}
}
func (c *cRaft) SyncPropose(parent context.Context, session *dgbcli.Session, cmd []byte) (dgbstm.Result, liberr.Error) {
ctx, cnl := c.syncCtxTimeout(parent)
defer c.syncCtxCancel(cnl)
r, e := c.nodeHost.SyncPropose(ctx, session, cmd)
if e != nil {
return r, ErrorCommandSync.ErrorParent(c.getErrorCommand("Propose"), e)
}
return r, nil
}
func (c *cRaft) SyncRead(parent context.Context, query interface{}) (interface{}, liberr.Error) {
ctx, cnl := c.syncCtxTimeout(parent)
defer c.syncCtxCancel(cnl)
r, e := c.nodeHost.SyncRead(ctx, c.config.ClusterID, query)
if e != nil {
return r, ErrorCommandSync.ErrorParent(c.getErrorCluster(), c.getErrorCommand("Read"), e)
}
return r, nil
}
func (c *cRaft) SyncGetClusterMembership(parent context.Context) (*dgbclt.Membership, liberr.Error) {
ctx, cnl := c.syncCtxTimeout(parent)
defer c.syncCtxCancel(cnl)
r, e := c.nodeHost.SyncGetClusterMembership(ctx, c.config.ClusterID)
if e != nil {
return r, ErrorCommandSync.ErrorParent(c.getErrorCluster(), c.getErrorCommand("GetClusterMembership"), e)
}
return r, nil
}
func (c *cRaft) SyncGetSession(parent context.Context) (*dgbcli.Session, liberr.Error) {
ctx, cnl := c.syncCtxTimeout(parent)
defer c.syncCtxCancel(cnl)
r, e := c.nodeHost.SyncGetSession(ctx, c.config.ClusterID)
if e != nil {
return r, ErrorCommandSync.ErrorParent(c.getErrorCluster(), c.getErrorCommand("GetSession"), e)
}
return r, nil
}
func (c *cRaft) SyncCloseSession(parent context.Context, cs *dgbcli.Session) liberr.Error {
ctx, cnl := c.syncCtxTimeout(parent)
defer c.syncCtxCancel(cnl)
e := c.nodeHost.SyncCloseSession(ctx, cs)
if e != nil {
return ErrorCommandSync.ErrorParent(c.getErrorCommand("CloseSession"), e)
}
return nil
}
func (c *cRaft) SyncRequestSnapshot(parent context.Context, opt dgbclt.SnapshotOption) (uint64, liberr.Error) {
ctx, cnl := c.syncCtxTimeout(parent)
defer c.syncCtxCancel(cnl)
r, e := c.nodeHost.SyncRequestSnapshot(ctx, c.config.ClusterID, opt)
if e != nil {
return r, ErrorCommandSync.ErrorParent(c.getErrorCluster(), c.getErrorCommand("RequestSnapshot"), e)
}
return r, nil
}
func (c *cRaft) SyncRequestDeleteNode(parent context.Context, nodeID uint64, configChangeIndex uint64) liberr.Error {
ctx, cnl := c.syncCtxTimeout(parent)
defer c.syncCtxCancel(cnl)
var en error
if nodeID == 0 {
nodeID = c.config.NodeID
en = c.getErrorNode()
} else {
en = c.getErrorNodeTarget(nodeID)
}
e := c.nodeHost.SyncRequestDeleteNode(ctx, c.config.ClusterID, nodeID, configChangeIndex)
if e != nil {
return ErrorCommandSync.ErrorParent(c.getErrorCluster(), en, c.getErrorCommand("RequestDeleteNode"), e)
}
return nil
}
//nolint #dupl
func (c *cRaft) SyncRequestAddNode(parent context.Context, nodeID uint64, target string, configChangeIndex uint64) liberr.Error {
ctx, cnl := c.syncCtxTimeout(parent)
defer c.syncCtxCancel(cnl)
var en error
if nodeID == 0 {
nodeID = c.config.NodeID
en = c.getErrorNode()
} else {
en = c.getErrorNodeTarget(nodeID)
}
e := c.nodeHost.SyncRequestAddNode(ctx, c.config.ClusterID, nodeID, target, configChangeIndex)
if e != nil {
return ErrorCommandSync.ErrorParent(c.getErrorCluster(), en, c.getErrorCommand("RequestAddNode"), e)
}
return nil
}
//nolint #dupl
func (c *cRaft) SyncRequestAddObserver(parent context.Context, nodeID uint64, target string, configChangeIndex uint64) liberr.Error {
ctx, cnl := c.syncCtxTimeout(parent)
defer c.syncCtxCancel(cnl)
var en error
if nodeID == 0 {
nodeID = c.config.NodeID
en = c.getErrorNode()
} else {
en = c.getErrorNodeTarget(nodeID)
}
e := c.nodeHost.SyncRequestAddObserver(ctx, c.config.ClusterID, nodeID, target, configChangeIndex)
if e != nil {
return ErrorCommandSync.ErrorParent(c.getErrorCluster(), en, c.getErrorCommand("RequestAddObserver"), e)
}
return nil
}
//nolint #dupl
func (c *cRaft) SyncRequestAddWitness(parent context.Context, nodeID uint64, target string, configChangeIndex uint64) liberr.Error {
ctx, cnl := c.syncCtxTimeout(parent)
defer c.syncCtxCancel(cnl)
var en error
if nodeID == 0 {
nodeID = c.config.NodeID
en = c.getErrorNode()
} else {
en = c.getErrorNodeTarget(nodeID)
}
e := c.nodeHost.SyncRequestAddWitness(ctx, c.config.ClusterID, nodeID, target, configChangeIndex)
if e != nil {
return ErrorCommandSync.ErrorParent(c.getErrorCluster(), en, c.getErrorCommand("RequestAddWitness"), e)
}
return nil
}
func (c *cRaft) SyncRemoveData(parent context.Context, nodeID uint64) liberr.Error {
ctx, cnl := c.syncCtxTimeout(parent)
defer c.syncCtxCancel(cnl)
var en error
if nodeID == 0 {
nodeID = c.config.NodeID
en = c.getErrorNode()
} else {
en = c.getErrorNodeTarget(nodeID)
}
e := c.nodeHost.SyncRemoveData(ctx, c.config.ClusterID, nodeID)
if e != nil {
return ErrorCommandSync.ErrorParent(c.getErrorCluster(), en, c.getErrorCommand("RemoveData"), e)
}
return nil
}