diff --git a/Dockerfile.dev b/Dockerfile.dev index 9f097dd..d1da77c 100644 --- a/Dockerfile.dev +++ b/Dockerfile.dev @@ -33,8 +33,7 @@ WORKDIR /opt/echovault/bin CMD "./server" \ "--bind-addr" "${BIND_ADDR}" \ "--port" "${PORT}" \ - "--memberlist-port" "${ML_PORT}" \ - "--raft-port" "${RAFT_PORT}" \ + "--discovery-port" "${DISCOVERY_PORT}" \ "--server-id" "${SERVER_ID}" \ "--join-addr" "${JOIN_ADDR}" \ "--data-dir" "${DATA_DIR}" \ diff --git a/cmd/main.go b/cmd/main.go index a36f0fd..6bd3da2 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -33,15 +33,6 @@ func main() { ctx := context.WithValue(context.Background(), internal.ContextServerID("ServerID"), conf.ServerID) - // Default BindAddr if it's not specified - if conf.BindAddr == "" { - if addr, err := internal.GetIPAddress(); err != nil { - log.Fatal(err) - } else { - conf.BindAddr = addr - } - } - cancelCh := make(chan os.Signal, 1) signal.Notify(cancelCh, syscall.SIGINT, syscall.SIGTERM, os.Interrupt) diff --git a/docker-compose.yaml b/docker-compose.yaml index c10ca42..56c1ce9 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -11,9 +11,9 @@ services: context: . dockerfile: Dockerfile.dev environment: + - BIND_ADDR=0.0.0.0 - PORT=7480 - - RAFT_PORT=8000 - - ML_PORT=7946 + - DISCOVERY_PORT=7946 - SERVER_ID=1 - PLUGIN_DIR=/usr/local/lib/echovault - DATA_DIR=/var/lib/echovault @@ -44,7 +44,6 @@ services: ports: - "7480:7480" - "7946:7946" - - "7999:8000" volumes: - ./volumes/config:/etc/echovault/config - ./volumes/modules:/lib/echovault/modules @@ -58,9 +57,9 @@ services: context: . dockerfile: Dockerfile.dev environment: + - BIND_ADDR=0.0.0.0 - PORT=7480 - - RAFT_PORT=8000 - - ML_PORT=7946 + - DISCOVERY_PORT=7946 - SERVER_ID=1 - JOIN_ADDR=2/cluster_node_2:7946 - DATA_DIR=/var/lib/echovault @@ -90,7 +89,6 @@ services: ports: - "7481:7480" - "7945:7946" - - "8000:8000" volumes: - ./volumes/config:/etc/echovault/config - ./volumes/plugins:/lib/echovault/plugins @@ -104,9 +102,9 @@ services: context: . dockerfile: Dockerfile.dev environment: + - BIND_ADDR=0.0.0.0 - PORT=7480 - - RAFT_PORT=8000 - - ML_PORT=7946 + - DISCOVERY_PORT=7946 - SERVER_ID=2 - JOIN_ADDR=3/cluster_node_3:7946 - DATA_DIR=/var/lib/echovault @@ -136,7 +134,6 @@ services: ports: - "7482:7480" - "7947:7946" - - "8001:8000" volumes: - ./volumes/config:/etc/echovault/config - ./volumes/plugins:/lib/echovault/plugins @@ -150,9 +147,9 @@ services: context: . dockerfile: Dockerfile.dev environment: + - BIND_ADDR=0.0.0.0 - PORT=7480 - - RAFT_PORT=8000 - - ML_PORT=7946 + - DISCOVERY_PORT=7946 - SERVER_ID=3 - JOIN_ADDR=4/cluster_node_4:7946 - DATA_DIR=/var/lib/echovault @@ -182,7 +179,6 @@ services: ports: - "7483:7480" - "7948:7946" - - "8002:8000" volumes: - ./volumes/config:/etc/echovault/config - ./volumes/plugins:/lib/echovault/plugins @@ -196,9 +192,9 @@ services: context: . dockerfile: Dockerfile.dev environment: + - BIND_ADDR=0.0.0.0 - PORT=7480 - - RAFT_PORT=8000 - - ML_PORT=7946 + - DISCOVERY_PORT=7946 - SERVER_ID=4 - JOIN_ADDR=5/cluster_node_5:7946 - DATA_DIR=/var/lib/echovault @@ -228,7 +224,6 @@ services: ports: - "7484:7480" - "7949:7946" - - "8003:8000" volumes: - ./volumes/config:/etc/echovault/config - ./volumes/plugins:/lib/echovault/plugins @@ -242,9 +237,9 @@ services: context: . dockerfile: Dockerfile.dev environment: + - BIND_ADDR=0.0.0.0 - PORT=7480 - - RAFT_PORT=8000 - - ML_PORT=7946 + - DISCOVERY_PORT=7946 - SERVER_ID=5 - JOIN_ADDR=1/cluster_node_1:7946 - DATA_DIR=/var/lib/echovault @@ -274,7 +269,6 @@ services: ports: - "7485:7480" - "7950:7946" - - "8004:8000" volumes: - ./volumes/config:/etc/echovault/config - ./volumes/plugins:/lib/echovault/plugins diff --git a/echovault/echovault_test.go b/echovault/echovault_test.go index 8bd1090..624aad4 100644 --- a/echovault/echovault_test.go +++ b/echovault/echovault_test.go @@ -41,8 +41,7 @@ type ClientServerPair struct { serverId string bindAddr string port int - raftPort int - mlPort int + discoveryPort int bootstrapCluster bool forwardCommand bool joinAddr string @@ -79,8 +78,7 @@ func setupServer( bindAddr, joinAddr string, port, - raftPort, - mlPort int, + discoveryPort int, ) (*EchoVault, error) { conf := DefaultConfig() conf.DataDir = dataDir @@ -89,8 +87,7 @@ func setupServer( conf.JoinAddr = joinAddr conf.Port = uint16(port) conf.ServerID = serverId - conf.RaftBindPort = uint16(raftPort) - conf.MemberListBindPort = uint16(mlPort) + conf.DiscoveryPort = uint16(discoveryPort) conf.BootstrapCluster = bootstrapCluster conf.EvictionPolicy = constants.NoEviction @@ -109,8 +106,7 @@ func setupNode(node *ClientServerPair, isLeader bool, errChan *chan error) { node.bindAddr, node.joinAddr, node.port, - node.raftPort, - node.mlPort, + node.discoveryPort, ) if err != nil { *errChan <- fmt.Errorf("could not start server; %v", err) @@ -161,17 +157,13 @@ func makeCluster(size int) ([]ClientServerPair, error) { forwardCommand := i < len(pairs)-1 // The last node will not forward commands to the cluster leader. joinAddr := "" if !bootstrapCluster { - joinAddr = fmt.Sprintf("%s/%s:%d", pairs[0].serverId, pairs[0].bindAddr, pairs[0].mlPort) + joinAddr = fmt.Sprintf("%s/%s:%d", pairs[0].serverId, pairs[0].bindAddr, pairs[0].discoveryPort) } port, err := internal.GetFreePort() if err != nil { return nil, fmt.Errorf("could not get free port: %v", err) } - raftPort, err := internal.GetFreePort() - if err != nil { - return nil, fmt.Errorf("could not get free raft port: %v", err) - } - memberlistPort, err := internal.GetFreePort() + discoveryPort, err := internal.GetFreePort() if err != nil { return nil, fmt.Errorf("could not get free memberlist port: %v", err) } @@ -181,8 +173,7 @@ func makeCluster(size int) ([]ClientServerPair, error) { serverId: serverId, bindAddr: bindAddr, port: port, - raftPort: raftPort, - mlPort: memberlistPort, + discoveryPort: discoveryPort, bootstrapCluster: bootstrapCluster, forwardCommand: forwardCommand, joinAddr: joinAddr, diff --git a/internal/config/config.go b/internal/config/config.go index 34d9d7a..6c4e9f5 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -32,32 +32,33 @@ import ( ) type Config struct { - TLS bool `json:"TLS" yaml:"TLS"` - MTLS bool `json:"MTLS" yaml:"MTLS"` - CertKeyPairs [][]string `json:"CertKeyPairs" yaml:"CertKeyPairs"` - ClientCAs []string `json:"ClientCAs" yaml:"ClientCAs"` - Port uint16 `json:"Port" yaml:"Port"` - ServerID string `json:"ServerId" yaml:"ServerId"` - JoinAddr string `json:"JoinAddr" yaml:"JoinAddr"` - BindAddr string `json:"BindAddr" yaml:"BindAddr"` - RaftBindPort uint16 `json:"RaftPort" yaml:"RaftPort"` - MemberListBindPort uint16 `json:"MlPort" yaml:"MlPort"` - DataDir string `json:"DataDir" yaml:"DataDir"` - BootstrapCluster bool `json:"BootstrapCluster" yaml:"BootstrapCluster"` - AclConfig string `json:"AclConfig" yaml:"AclConfig"` - ForwardCommand bool `json:"ForwardCommand" yaml:"ForwardCommand"` - RequirePass bool `json:"RequirePass" yaml:"RequirePass"` - Password string `json:"Password" yaml:"Password"` - SnapShotThreshold uint64 `json:"SnapshotThreshold" yaml:"SnapshotThreshold"` - SnapshotInterval time.Duration `json:"SnapshotInterval" yaml:"SnapshotInterval"` - RestoreSnapshot bool `json:"RestoreSnapshot" yaml:"RestoreSnapshot"` - RestoreAOF bool `json:"RestoreAOF" yaml:"RestoreAOF"` - AOFSyncStrategy string `json:"AOFSyncStrategy" yaml:"AOFSyncStrategy"` - MaxMemory uint64 `json:"MaxMemory" yaml:"MaxMemory"` - EvictionPolicy string `json:"EvictionPolicy" yaml:"EvictionPolicy"` - EvictionSample uint `json:"EvictionSample" yaml:"EvictionSample"` - EvictionInterval time.Duration `json:"EvictionInterval" yaml:"EvictionInterval"` - Modules []string `json:"Plugins" yaml:"Plugins"` + TLS bool `json:"TLS" yaml:"TLS"` + MTLS bool `json:"MTLS" yaml:"MTLS"` + CertKeyPairs [][]string `json:"CertKeyPairs" yaml:"CertKeyPairs"` + ClientCAs []string `json:"ClientCAs" yaml:"ClientCAs"` + Port uint16 `json:"Port" yaml:"Port"` + ServerID string `json:"ServerId" yaml:"ServerId"` + JoinAddr string `json:"JoinAddr" yaml:"JoinAddr"` + BindAddr string `json:"BindAddr" yaml:"BindAddr"` + DataDir string `json:"DataDir" yaml:"DataDir"` + BootstrapCluster bool `json:"BootstrapCluster" yaml:"BootstrapCluster"` + AclConfig string `json:"AclConfig" yaml:"AclConfig"` + ForwardCommand bool `json:"ForwardCommand" yaml:"ForwardCommand"` + RequirePass bool `json:"RequirePass" yaml:"RequirePass"` + Password string `json:"Password" yaml:"Password"` + SnapShotThreshold uint64 `json:"SnapshotThreshold" yaml:"SnapshotThreshold"` + SnapshotInterval time.Duration `json:"SnapshotInterval" yaml:"SnapshotInterval"` + RestoreSnapshot bool `json:"RestoreSnapshot" yaml:"RestoreSnapshot"` + RestoreAOF bool `json:"RestoreAOF" yaml:"RestoreAOF"` + AOFSyncStrategy string `json:"AOFSyncStrategy" yaml:"AOFSyncStrategy"` + MaxMemory uint64 `json:"MaxMemory" yaml:"MaxMemory"` + EvictionPolicy string `json:"EvictionPolicy" yaml:"EvictionPolicy"` + EvictionSample uint `json:"EvictionSample" yaml:"EvictionSample"` + EvictionInterval time.Duration `json:"EvictionInterval" yaml:"EvictionInterval"` + Modules []string `json:"Plugins" yaml:"Plugins"` + DiscoveryPort uint16 + RaftBindAddr string + RaftBindPort uint16 } func GetConfig() (Config, error) { @@ -148,9 +149,8 @@ There is no limit by default.`, func(memory string) error { port := flag.Int("port", 7480, "Port to use. Default is 7480") serverId := flag.String("server-id", "1", "EchoVault ID in raft cluster. Leave empty for client.") joinAddr := flag.String("join-addr", "", "Address of cluster member in a cluster to you want to join.") - bindAddr := flag.String("bind-addr", "", "Address to bind the echovault to.") - raftBindPort := flag.Uint("raft-port", 7481, "Port to use for intra-cluster communication. Leave on the client.") - mlBindPort := flag.Uint("memberlist-port", 7946, "Port to use for memberlist communication.") + bindAddr := flag.String("bind-addr", "127.0.0.1", "Address to bind the echovault to.") + discoveryPort := flag.Uint("discovery-port", 7946, "Port to use for memberlist cluster discovery.") dataDir := flag.String("data-dir", ".", "Directory to store snapshots and logs.") bootstrapCluster := flag.Bool("bootstrap-cluster", false, "Whether this instance should bootstrap a new cluster.") aclConfig := flag.String("acl-config", "", "ACL config file path.") @@ -184,33 +184,43 @@ It is a plain text value by default but you can provide a SHA256 hash by adding flag.Parse() + raftBindAddr, e := internal.GetIPAddress() + if e != nil { + return Config{}, e + } + raftBindPort, e := internal.GetFreePort() + if e != nil { + return Config{}, e + } + conf := Config{ - CertKeyPairs: certKeyPairs, - ClientCAs: clientCAs, - TLS: *tls, - MTLS: *mtls, - Port: uint16(*port), - ServerID: *serverId, - JoinAddr: *joinAddr, - BindAddr: *bindAddr, - RaftBindPort: uint16(*raftBindPort), - MemberListBindPort: uint16(*mlBindPort), - DataDir: *dataDir, - BootstrapCluster: *bootstrapCluster, - AclConfig: *aclConfig, - ForwardCommand: *forwardCommand, - RequirePass: *requirePass, - Password: *password, - SnapShotThreshold: *snapshotThreshold, - SnapshotInterval: *snapshotInterval, - RestoreSnapshot: *restoreSnapshot, - RestoreAOF: *restoreAOF, - AOFSyncStrategy: aofSyncStrategy, - MaxMemory: maxMemory, - EvictionPolicy: evictionPolicy, - EvictionSample: *evictionSample, - EvictionInterval: *evictionInterval, - Modules: modules, + CertKeyPairs: certKeyPairs, + ClientCAs: clientCAs, + TLS: *tls, + MTLS: *mtls, + Port: uint16(*port), + ServerID: *serverId, + JoinAddr: *joinAddr, + BindAddr: *bindAddr, + DataDir: *dataDir, + BootstrapCluster: *bootstrapCluster, + AclConfig: *aclConfig, + ForwardCommand: *forwardCommand, + RequirePass: *requirePass, + Password: *password, + SnapShotThreshold: *snapshotThreshold, + SnapshotInterval: *snapshotInterval, + RestoreSnapshot: *restoreSnapshot, + RestoreAOF: *restoreAOF, + AOFSyncStrategy: aofSyncStrategy, + MaxMemory: maxMemory, + EvictionPolicy: evictionPolicy, + EvictionSample: *evictionSample, + EvictionInterval: *evictionInterval, + Modules: modules, + DiscoveryPort: uint16(*discoveryPort), + RaftBindAddr: raftBindAddr, + RaftBindPort: uint16(raftBindPort), } if len(*config) > 0 { diff --git a/internal/config/default.go b/internal/config/default.go index 9dd9c0d..b16a545 100644 --- a/internal/config/default.go +++ b/internal/config/default.go @@ -1,37 +1,42 @@ package config import ( + "github.com/echovault/echovault/internal" "github.com/echovault/echovault/internal/constants" "time" ) func DefaultConfig() Config { + raftBindAddr, _ := internal.GetIPAddress() + raftBindPort, _ := internal.GetFreePort() + return Config{ - TLS: false, - MTLS: false, - CertKeyPairs: make([][]string, 0), - ClientCAs: make([]string, 0), - Port: 7480, - ServerID: "", - JoinAddr: "", - BindAddr: "localhost", - RaftBindPort: 7481, - MemberListBindPort: 7946, - DataDir: ".", - BootstrapCluster: false, - AclConfig: "", - ForwardCommand: false, - RequirePass: false, - Password: "", - SnapShotThreshold: 1000, - SnapshotInterval: 5 * time.Minute, - RestoreAOF: false, - RestoreSnapshot: false, - AOFSyncStrategy: "everysec", - MaxMemory: 0, - EvictionPolicy: constants.NoEviction, - EvictionSample: 20, - EvictionInterval: 100 * time.Millisecond, - Modules: make([]string, 0), + TLS: false, + MTLS: false, + CertKeyPairs: make([][]string, 0), + ClientCAs: make([]string, 0), + Port: 7480, + ServerID: "", + JoinAddr: "", + BindAddr: "localhost", + RaftBindAddr: raftBindAddr, + RaftBindPort: uint16(raftBindPort), + DiscoveryPort: 7946, + DataDir: ".", + BootstrapCluster: false, + AclConfig: "", + ForwardCommand: false, + RequirePass: false, + Password: "", + SnapShotThreshold: 1000, + SnapshotInterval: 5 * time.Minute, + RestoreAOF: false, + RestoreSnapshot: false, + AOFSyncStrategy: "everysec", + MaxMemory: 0, + EvictionPolicy: constants.NoEviction, + EvictionSample: 20, + EvictionInterval: 100 * time.Millisecond, + Modules: make([]string, 0), } } diff --git a/internal/memberlist/delegate.go b/internal/memberlist/delegate.go index b6673ae..26bf200 100644 --- a/internal/memberlist/delegate.go +++ b/internal/memberlist/delegate.go @@ -50,8 +50,8 @@ func (delegate *Delegate) NodeMeta(limit int) []byte { meta := NodeMeta{ ServerID: raft.ServerID(delegate.options.config.ServerID), RaftAddr: raft.ServerAddress( - fmt.Sprintf("%s:%d", delegate.options.config.BindAddr, delegate.options.config.RaftBindPort)), - MemberlistAddr: fmt.Sprintf("%s:%d", delegate.options.config.BindAddr, delegate.options.config.MemberListBindPort), + fmt.Sprintf("%s:%d", delegate.options.config.RaftBindAddr, delegate.options.config.RaftBindPort)), + MemberlistAddr: fmt.Sprintf("%s:%d", delegate.options.config.BindAddr, delegate.options.config.DiscoveryPort), } b, err := json.Marshal(&meta) @@ -73,7 +73,7 @@ func (delegate *Delegate) NotifyMsg(msgBytes []byte) { switch msg.Action { case "RaftJoin": - // If the current node is not the cluster leader, re-broadcast the message + // If the current node is not the cluster leader, re-broadcast the message. if !delegate.options.isRaftLeader() { delegate.options.broadcastQueue.QueueBroadcast(&msg) return @@ -84,12 +84,12 @@ func (delegate *Delegate) NotifyMsg(msgBytes []byte) { } case "DeleteKey": - // If the current node is not a cluster leader, re-broadcast the message + // If the current node is not a cluster leader, re-broadcast the message. if !delegate.options.isRaftLeader() { delegate.options.broadcastQueue.QueueBroadcast(&msg) return } - // Current node is the cluster leader, handle the key deletion + // Current node is the cluster leader, handle the key deletion. ctx := context.WithValue( context.WithValue(context.Background(), internal.ContextServerID("ServerID"), string(msg.ServerID)), internal.ContextConnID("ConnectionID"), msg.ConnId) @@ -101,7 +101,7 @@ func (delegate *Delegate) NotifyMsg(msgBytes []byte) { } case "MutateData": - // If the current node is not a cluster leader, re-broadcast the message + // If the current node is not a cluster leader, re-broadcast the message. if !delegate.options.isRaftLeader() { delegate.options.broadcastQueue.QueueBroadcast(&msg) return diff --git a/internal/memberlist/memberlist.go b/internal/memberlist/memberlist.go index bf25804..895e1cf 100644 --- a/internal/memberlist/memberlist.go +++ b/internal/memberlist/memberlist.go @@ -64,7 +64,7 @@ func (m *MemberList) MemberListInit(ctx context.Context) { cfg.RequireNodeNames = true cfg.Name = m.options.Config.ServerID cfg.BindAddr = m.options.Config.BindAddr - cfg.BindPort = int(m.options.Config.MemberListBindPort) + cfg.BindPort = int(m.options.Config.DiscoveryPort) cfg.Delegate = NewDelegate(DelegateOpts{ config: m.options.Config, broadcastQueue: m.broadcastQueue, @@ -116,7 +116,7 @@ func (m *MemberList) broadcastRaftAddress() { NodeMeta: NodeMeta{ ServerID: raft.ServerID(m.options.Config.ServerID), RaftAddr: raft.ServerAddress(fmt.Sprintf("%s:%d", - m.options.Config.BindAddr, m.options.Config.RaftBindPort)), + m.options.Config.RaftBindAddr, m.options.Config.RaftBindPort)), }, } m.broadcastQueue.QueueBroadcast(&msg) diff --git a/internal/raft/raft.go b/internal/raft/raft.go index 0c3dae5..dd8d81d 100644 --- a/internal/raft/raft.go +++ b/internal/raft/raft.go @@ -91,15 +91,14 @@ func (r *Raft) RaftInit(ctx context.Context) { } } - addr := fmt.Sprintf("%s:%d", conf.BindAddr, conf.RaftBindPort) - - advertiseAddr, err := net.ResolveTCPAddr("tcp", addr) + bindAddr := fmt.Sprintf("%s:%d", conf.RaftBindAddr, conf.RaftBindPort) + advertiseAddr, err := net.ResolveTCPAddr("tcp", bindAddr) if err != nil { log.Fatal(err) } raftTransport, err := raft.NewTCPTransport( - addr, + bindAddr, advertiseAddr, 10, 5*time.Second, @@ -142,7 +141,7 @@ func (r *Raft) RaftInit(ctx context.Context) { { Suffrage: raft.Voter, ID: raft.ServerID(conf.ServerID), - Address: raft.ServerAddress(addr), + Address: raft.ServerAddress(conf.RaftBindAddr), }, }, }).Error() @@ -185,7 +184,7 @@ func (r *Raft) AddVoter( } for _, s := range raftConfig.Configuration().Servers { - // Check if a echovault already exists with the current attributes + // Check if a node already exists with the current attributes. if s.ID == id && s.Address == address { return fmt.Errorf("node with id %s and address %s already exists", id, address) }