Removed RaftBindPort config. Raft bind address and port are dynamically assigned on startup instead of configured manually. Replaced memberlist-port configuration with discovery-port.

This commit is contained in:
Kelvin Mwinuka
2024-06-20 04:20:31 +08:00
parent 75baaa5c47
commit 9b0d590171
9 changed files with 129 additions and 140 deletions

View File

@@ -33,8 +33,7 @@ WORKDIR /opt/echovault/bin
CMD "./server" \
"--bind-addr" "${BIND_ADDR}" \
"--port" "${PORT}" \
"--memberlist-port" "${ML_PORT}" \
"--raft-port" "${RAFT_PORT}" \
"--discovery-port" "${DISCOVERY_PORT}" \
"--server-id" "${SERVER_ID}" \
"--join-addr" "${JOIN_ADDR}" \
"--data-dir" "${DATA_DIR}" \

View File

@@ -33,15 +33,6 @@ func main() {
ctx := context.WithValue(context.Background(), internal.ContextServerID("ServerID"), conf.ServerID)
// Default BindAddr if it's not specified
if conf.BindAddr == "" {
if addr, err := internal.GetIPAddress(); err != nil {
log.Fatal(err)
} else {
conf.BindAddr = addr
}
}
cancelCh := make(chan os.Signal, 1)
signal.Notify(cancelCh, syscall.SIGINT, syscall.SIGTERM, os.Interrupt)

View File

@@ -11,9 +11,9 @@ services:
context: .
dockerfile: Dockerfile.dev
environment:
- BIND_ADDR=0.0.0.0
- PORT=7480
- RAFT_PORT=8000
- ML_PORT=7946
- DISCOVERY_PORT=7946
- SERVER_ID=1
- PLUGIN_DIR=/usr/local/lib/echovault
- DATA_DIR=/var/lib/echovault
@@ -44,7 +44,6 @@ services:
ports:
- "7480:7480"
- "7946:7946"
- "7999:8000"
volumes:
- ./volumes/config:/etc/echovault/config
- ./volumes/modules:/lib/echovault/modules
@@ -58,9 +57,9 @@ services:
context: .
dockerfile: Dockerfile.dev
environment:
- BIND_ADDR=0.0.0.0
- PORT=7480
- RAFT_PORT=8000
- ML_PORT=7946
- DISCOVERY_PORT=7946
- SERVER_ID=1
- JOIN_ADDR=2/cluster_node_2:7946
- DATA_DIR=/var/lib/echovault
@@ -90,7 +89,6 @@ services:
ports:
- "7481:7480"
- "7945:7946"
- "8000:8000"
volumes:
- ./volumes/config:/etc/echovault/config
- ./volumes/plugins:/lib/echovault/plugins
@@ -104,9 +102,9 @@ services:
context: .
dockerfile: Dockerfile.dev
environment:
- BIND_ADDR=0.0.0.0
- PORT=7480
- RAFT_PORT=8000
- ML_PORT=7946
- DISCOVERY_PORT=7946
- SERVER_ID=2
- JOIN_ADDR=3/cluster_node_3:7946
- DATA_DIR=/var/lib/echovault
@@ -136,7 +134,6 @@ services:
ports:
- "7482:7480"
- "7947:7946"
- "8001:8000"
volumes:
- ./volumes/config:/etc/echovault/config
- ./volumes/plugins:/lib/echovault/plugins
@@ -150,9 +147,9 @@ services:
context: .
dockerfile: Dockerfile.dev
environment:
- BIND_ADDR=0.0.0.0
- PORT=7480
- RAFT_PORT=8000
- ML_PORT=7946
- DISCOVERY_PORT=7946
- SERVER_ID=3
- JOIN_ADDR=4/cluster_node_4:7946
- DATA_DIR=/var/lib/echovault
@@ -182,7 +179,6 @@ services:
ports:
- "7483:7480"
- "7948:7946"
- "8002:8000"
volumes:
- ./volumes/config:/etc/echovault/config
- ./volumes/plugins:/lib/echovault/plugins
@@ -196,9 +192,9 @@ services:
context: .
dockerfile: Dockerfile.dev
environment:
- BIND_ADDR=0.0.0.0
- PORT=7480
- RAFT_PORT=8000
- ML_PORT=7946
- DISCOVERY_PORT=7946
- SERVER_ID=4
- JOIN_ADDR=5/cluster_node_5:7946
- DATA_DIR=/var/lib/echovault
@@ -228,7 +224,6 @@ services:
ports:
- "7484:7480"
- "7949:7946"
- "8003:8000"
volumes:
- ./volumes/config:/etc/echovault/config
- ./volumes/plugins:/lib/echovault/plugins
@@ -242,9 +237,9 @@ services:
context: .
dockerfile: Dockerfile.dev
environment:
- BIND_ADDR=0.0.0.0
- PORT=7480
- RAFT_PORT=8000
- ML_PORT=7946
- DISCOVERY_PORT=7946
- SERVER_ID=5
- JOIN_ADDR=1/cluster_node_1:7946
- DATA_DIR=/var/lib/echovault
@@ -274,7 +269,6 @@ services:
ports:
- "7485:7480"
- "7950:7946"
- "8004:8000"
volumes:
- ./volumes/config:/etc/echovault/config
- ./volumes/plugins:/lib/echovault/plugins

View File

@@ -41,8 +41,7 @@ type ClientServerPair struct {
serverId string
bindAddr string
port int
raftPort int
mlPort int
discoveryPort int
bootstrapCluster bool
forwardCommand bool
joinAddr string
@@ -79,8 +78,7 @@ func setupServer(
bindAddr,
joinAddr string,
port,
raftPort,
mlPort int,
discoveryPort int,
) (*EchoVault, error) {
conf := DefaultConfig()
conf.DataDir = dataDir
@@ -89,8 +87,7 @@ func setupServer(
conf.JoinAddr = joinAddr
conf.Port = uint16(port)
conf.ServerID = serverId
conf.RaftBindPort = uint16(raftPort)
conf.MemberListBindPort = uint16(mlPort)
conf.DiscoveryPort = uint16(discoveryPort)
conf.BootstrapCluster = bootstrapCluster
conf.EvictionPolicy = constants.NoEviction
@@ -109,8 +106,7 @@ func setupNode(node *ClientServerPair, isLeader bool, errChan *chan error) {
node.bindAddr,
node.joinAddr,
node.port,
node.raftPort,
node.mlPort,
node.discoveryPort,
)
if err != nil {
*errChan <- fmt.Errorf("could not start server; %v", err)
@@ -161,17 +157,13 @@ func makeCluster(size int) ([]ClientServerPair, error) {
forwardCommand := i < len(pairs)-1 // The last node will not forward commands to the cluster leader.
joinAddr := ""
if !bootstrapCluster {
joinAddr = fmt.Sprintf("%s/%s:%d", pairs[0].serverId, pairs[0].bindAddr, pairs[0].mlPort)
joinAddr = fmt.Sprintf("%s/%s:%d", pairs[0].serverId, pairs[0].bindAddr, pairs[0].discoveryPort)
}
port, err := internal.GetFreePort()
if err != nil {
return nil, fmt.Errorf("could not get free port: %v", err)
}
raftPort, err := internal.GetFreePort()
if err != nil {
return nil, fmt.Errorf("could not get free raft port: %v", err)
}
memberlistPort, err := internal.GetFreePort()
discoveryPort, err := internal.GetFreePort()
if err != nil {
return nil, fmt.Errorf("could not get free memberlist port: %v", err)
}
@@ -181,8 +173,7 @@ func makeCluster(size int) ([]ClientServerPair, error) {
serverId: serverId,
bindAddr: bindAddr,
port: port,
raftPort: raftPort,
mlPort: memberlistPort,
discoveryPort: discoveryPort,
bootstrapCluster: bootstrapCluster,
forwardCommand: forwardCommand,
joinAddr: joinAddr,

View File

@@ -32,32 +32,33 @@ import (
)
type Config struct {
TLS bool `json:"TLS" yaml:"TLS"`
MTLS bool `json:"MTLS" yaml:"MTLS"`
CertKeyPairs [][]string `json:"CertKeyPairs" yaml:"CertKeyPairs"`
ClientCAs []string `json:"ClientCAs" yaml:"ClientCAs"`
Port uint16 `json:"Port" yaml:"Port"`
ServerID string `json:"ServerId" yaml:"ServerId"`
JoinAddr string `json:"JoinAddr" yaml:"JoinAddr"`
BindAddr string `json:"BindAddr" yaml:"BindAddr"`
RaftBindPort uint16 `json:"RaftPort" yaml:"RaftPort"`
MemberListBindPort uint16 `json:"MlPort" yaml:"MlPort"`
DataDir string `json:"DataDir" yaml:"DataDir"`
BootstrapCluster bool `json:"BootstrapCluster" yaml:"BootstrapCluster"`
AclConfig string `json:"AclConfig" yaml:"AclConfig"`
ForwardCommand bool `json:"ForwardCommand" yaml:"ForwardCommand"`
RequirePass bool `json:"RequirePass" yaml:"RequirePass"`
Password string `json:"Password" yaml:"Password"`
SnapShotThreshold uint64 `json:"SnapshotThreshold" yaml:"SnapshotThreshold"`
SnapshotInterval time.Duration `json:"SnapshotInterval" yaml:"SnapshotInterval"`
RestoreSnapshot bool `json:"RestoreSnapshot" yaml:"RestoreSnapshot"`
RestoreAOF bool `json:"RestoreAOF" yaml:"RestoreAOF"`
AOFSyncStrategy string `json:"AOFSyncStrategy" yaml:"AOFSyncStrategy"`
MaxMemory uint64 `json:"MaxMemory" yaml:"MaxMemory"`
EvictionPolicy string `json:"EvictionPolicy" yaml:"EvictionPolicy"`
EvictionSample uint `json:"EvictionSample" yaml:"EvictionSample"`
EvictionInterval time.Duration `json:"EvictionInterval" yaml:"EvictionInterval"`
Modules []string `json:"Plugins" yaml:"Plugins"`
TLS bool `json:"TLS" yaml:"TLS"`
MTLS bool `json:"MTLS" yaml:"MTLS"`
CertKeyPairs [][]string `json:"CertKeyPairs" yaml:"CertKeyPairs"`
ClientCAs []string `json:"ClientCAs" yaml:"ClientCAs"`
Port uint16 `json:"Port" yaml:"Port"`
ServerID string `json:"ServerId" yaml:"ServerId"`
JoinAddr string `json:"JoinAddr" yaml:"JoinAddr"`
BindAddr string `json:"BindAddr" yaml:"BindAddr"`
DataDir string `json:"DataDir" yaml:"DataDir"`
BootstrapCluster bool `json:"BootstrapCluster" yaml:"BootstrapCluster"`
AclConfig string `json:"AclConfig" yaml:"AclConfig"`
ForwardCommand bool `json:"ForwardCommand" yaml:"ForwardCommand"`
RequirePass bool `json:"RequirePass" yaml:"RequirePass"`
Password string `json:"Password" yaml:"Password"`
SnapShotThreshold uint64 `json:"SnapshotThreshold" yaml:"SnapshotThreshold"`
SnapshotInterval time.Duration `json:"SnapshotInterval" yaml:"SnapshotInterval"`
RestoreSnapshot bool `json:"RestoreSnapshot" yaml:"RestoreSnapshot"`
RestoreAOF bool `json:"RestoreAOF" yaml:"RestoreAOF"`
AOFSyncStrategy string `json:"AOFSyncStrategy" yaml:"AOFSyncStrategy"`
MaxMemory uint64 `json:"MaxMemory" yaml:"MaxMemory"`
EvictionPolicy string `json:"EvictionPolicy" yaml:"EvictionPolicy"`
EvictionSample uint `json:"EvictionSample" yaml:"EvictionSample"`
EvictionInterval time.Duration `json:"EvictionInterval" yaml:"EvictionInterval"`
Modules []string `json:"Plugins" yaml:"Plugins"`
DiscoveryPort uint16
RaftBindAddr string
RaftBindPort uint16
}
func GetConfig() (Config, error) {
@@ -148,9 +149,8 @@ There is no limit by default.`, func(memory string) error {
port := flag.Int("port", 7480, "Port to use. Default is 7480")
serverId := flag.String("server-id", "1", "EchoVault ID in raft cluster. Leave empty for client.")
joinAddr := flag.String("join-addr", "", "Address of cluster member in a cluster to you want to join.")
bindAddr := flag.String("bind-addr", "", "Address to bind the echovault to.")
raftBindPort := flag.Uint("raft-port", 7481, "Port to use for intra-cluster communication. Leave on the client.")
mlBindPort := flag.Uint("memberlist-port", 7946, "Port to use for memberlist communication.")
bindAddr := flag.String("bind-addr", "127.0.0.1", "Address to bind the echovault to.")
discoveryPort := flag.Uint("discovery-port", 7946, "Port to use for memberlist cluster discovery.")
dataDir := flag.String("data-dir", ".", "Directory to store snapshots and logs.")
bootstrapCluster := flag.Bool("bootstrap-cluster", false, "Whether this instance should bootstrap a new cluster.")
aclConfig := flag.String("acl-config", "", "ACL config file path.")
@@ -184,33 +184,43 @@ It is a plain text value by default but you can provide a SHA256 hash by adding
flag.Parse()
raftBindAddr, e := internal.GetIPAddress()
if e != nil {
return Config{}, e
}
raftBindPort, e := internal.GetFreePort()
if e != nil {
return Config{}, e
}
conf := Config{
CertKeyPairs: certKeyPairs,
ClientCAs: clientCAs,
TLS: *tls,
MTLS: *mtls,
Port: uint16(*port),
ServerID: *serverId,
JoinAddr: *joinAddr,
BindAddr: *bindAddr,
RaftBindPort: uint16(*raftBindPort),
MemberListBindPort: uint16(*mlBindPort),
DataDir: *dataDir,
BootstrapCluster: *bootstrapCluster,
AclConfig: *aclConfig,
ForwardCommand: *forwardCommand,
RequirePass: *requirePass,
Password: *password,
SnapShotThreshold: *snapshotThreshold,
SnapshotInterval: *snapshotInterval,
RestoreSnapshot: *restoreSnapshot,
RestoreAOF: *restoreAOF,
AOFSyncStrategy: aofSyncStrategy,
MaxMemory: maxMemory,
EvictionPolicy: evictionPolicy,
EvictionSample: *evictionSample,
EvictionInterval: *evictionInterval,
Modules: modules,
CertKeyPairs: certKeyPairs,
ClientCAs: clientCAs,
TLS: *tls,
MTLS: *mtls,
Port: uint16(*port),
ServerID: *serverId,
JoinAddr: *joinAddr,
BindAddr: *bindAddr,
DataDir: *dataDir,
BootstrapCluster: *bootstrapCluster,
AclConfig: *aclConfig,
ForwardCommand: *forwardCommand,
RequirePass: *requirePass,
Password: *password,
SnapShotThreshold: *snapshotThreshold,
SnapshotInterval: *snapshotInterval,
RestoreSnapshot: *restoreSnapshot,
RestoreAOF: *restoreAOF,
AOFSyncStrategy: aofSyncStrategy,
MaxMemory: maxMemory,
EvictionPolicy: evictionPolicy,
EvictionSample: *evictionSample,
EvictionInterval: *evictionInterval,
Modules: modules,
DiscoveryPort: uint16(*discoveryPort),
RaftBindAddr: raftBindAddr,
RaftBindPort: uint16(raftBindPort),
}
if len(*config) > 0 {

View File

@@ -1,37 +1,42 @@
package config
import (
"github.com/echovault/echovault/internal"
"github.com/echovault/echovault/internal/constants"
"time"
)
func DefaultConfig() Config {
raftBindAddr, _ := internal.GetIPAddress()
raftBindPort, _ := internal.GetFreePort()
return Config{
TLS: false,
MTLS: false,
CertKeyPairs: make([][]string, 0),
ClientCAs: make([]string, 0),
Port: 7480,
ServerID: "",
JoinAddr: "",
BindAddr: "localhost",
RaftBindPort: 7481,
MemberListBindPort: 7946,
DataDir: ".",
BootstrapCluster: false,
AclConfig: "",
ForwardCommand: false,
RequirePass: false,
Password: "",
SnapShotThreshold: 1000,
SnapshotInterval: 5 * time.Minute,
RestoreAOF: false,
RestoreSnapshot: false,
AOFSyncStrategy: "everysec",
MaxMemory: 0,
EvictionPolicy: constants.NoEviction,
EvictionSample: 20,
EvictionInterval: 100 * time.Millisecond,
Modules: make([]string, 0),
TLS: false,
MTLS: false,
CertKeyPairs: make([][]string, 0),
ClientCAs: make([]string, 0),
Port: 7480,
ServerID: "",
JoinAddr: "",
BindAddr: "localhost",
RaftBindAddr: raftBindAddr,
RaftBindPort: uint16(raftBindPort),
DiscoveryPort: 7946,
DataDir: ".",
BootstrapCluster: false,
AclConfig: "",
ForwardCommand: false,
RequirePass: false,
Password: "",
SnapShotThreshold: 1000,
SnapshotInterval: 5 * time.Minute,
RestoreAOF: false,
RestoreSnapshot: false,
AOFSyncStrategy: "everysec",
MaxMemory: 0,
EvictionPolicy: constants.NoEviction,
EvictionSample: 20,
EvictionInterval: 100 * time.Millisecond,
Modules: make([]string, 0),
}
}

View File

@@ -50,8 +50,8 @@ func (delegate *Delegate) NodeMeta(limit int) []byte {
meta := NodeMeta{
ServerID: raft.ServerID(delegate.options.config.ServerID),
RaftAddr: raft.ServerAddress(
fmt.Sprintf("%s:%d", delegate.options.config.BindAddr, delegate.options.config.RaftBindPort)),
MemberlistAddr: fmt.Sprintf("%s:%d", delegate.options.config.BindAddr, delegate.options.config.MemberListBindPort),
fmt.Sprintf("%s:%d", delegate.options.config.RaftBindAddr, delegate.options.config.RaftBindPort)),
MemberlistAddr: fmt.Sprintf("%s:%d", delegate.options.config.BindAddr, delegate.options.config.DiscoveryPort),
}
b, err := json.Marshal(&meta)
@@ -73,7 +73,7 @@ func (delegate *Delegate) NotifyMsg(msgBytes []byte) {
switch msg.Action {
case "RaftJoin":
// If the current node is not the cluster leader, re-broadcast the message
// If the current node is not the cluster leader, re-broadcast the message.
if !delegate.options.isRaftLeader() {
delegate.options.broadcastQueue.QueueBroadcast(&msg)
return
@@ -84,12 +84,12 @@ func (delegate *Delegate) NotifyMsg(msgBytes []byte) {
}
case "DeleteKey":
// If the current node is not a cluster leader, re-broadcast the message
// If the current node is not a cluster leader, re-broadcast the message.
if !delegate.options.isRaftLeader() {
delegate.options.broadcastQueue.QueueBroadcast(&msg)
return
}
// Current node is the cluster leader, handle the key deletion
// Current node is the cluster leader, handle the key deletion.
ctx := context.WithValue(
context.WithValue(context.Background(), internal.ContextServerID("ServerID"), string(msg.ServerID)),
internal.ContextConnID("ConnectionID"), msg.ConnId)
@@ -101,7 +101,7 @@ func (delegate *Delegate) NotifyMsg(msgBytes []byte) {
}
case "MutateData":
// If the current node is not a cluster leader, re-broadcast the message
// If the current node is not a cluster leader, re-broadcast the message.
if !delegate.options.isRaftLeader() {
delegate.options.broadcastQueue.QueueBroadcast(&msg)
return

View File

@@ -64,7 +64,7 @@ func (m *MemberList) MemberListInit(ctx context.Context) {
cfg.RequireNodeNames = true
cfg.Name = m.options.Config.ServerID
cfg.BindAddr = m.options.Config.BindAddr
cfg.BindPort = int(m.options.Config.MemberListBindPort)
cfg.BindPort = int(m.options.Config.DiscoveryPort)
cfg.Delegate = NewDelegate(DelegateOpts{
config: m.options.Config,
broadcastQueue: m.broadcastQueue,
@@ -116,7 +116,7 @@ func (m *MemberList) broadcastRaftAddress() {
NodeMeta: NodeMeta{
ServerID: raft.ServerID(m.options.Config.ServerID),
RaftAddr: raft.ServerAddress(fmt.Sprintf("%s:%d",
m.options.Config.BindAddr, m.options.Config.RaftBindPort)),
m.options.Config.RaftBindAddr, m.options.Config.RaftBindPort)),
},
}
m.broadcastQueue.QueueBroadcast(&msg)

View File

@@ -91,15 +91,14 @@ func (r *Raft) RaftInit(ctx context.Context) {
}
}
addr := fmt.Sprintf("%s:%d", conf.BindAddr, conf.RaftBindPort)
advertiseAddr, err := net.ResolveTCPAddr("tcp", addr)
bindAddr := fmt.Sprintf("%s:%d", conf.RaftBindAddr, conf.RaftBindPort)
advertiseAddr, err := net.ResolveTCPAddr("tcp", bindAddr)
if err != nil {
log.Fatal(err)
}
raftTransport, err := raft.NewTCPTransport(
addr,
bindAddr,
advertiseAddr,
10,
5*time.Second,
@@ -142,7 +141,7 @@ func (r *Raft) RaftInit(ctx context.Context) {
{
Suffrage: raft.Voter,
ID: raft.ServerID(conf.ServerID),
Address: raft.ServerAddress(addr),
Address: raft.ServerAddress(conf.RaftBindAddr),
},
},
}).Error()
@@ -185,7 +184,7 @@ func (r *Raft) AddVoter(
}
for _, s := range raftConfig.Configuration().Servers {
// Check if a echovault already exists with the current attributes
// Check if a node already exists with the current attributes.
if s.ID == id && s.Address == address {
return fmt.Errorf("node with id %s and address %s already exists", id, address)
}