Re-enabled cluster nodes in docker-compose.yml.

Created flow to forward key deletion command from non-leader node to leader node.
Created flow for propagating key deletion accross the entire cluster to maintain consistency of the deleted keys.
This commit is contained in:
Kelvin Mwinuka
2024-03-11 20:39:57 +08:00
parent c611dd60ee
commit 52b39d5b0f
7 changed files with 294 additions and 225 deletions

View File

@@ -47,216 +47,216 @@ services:
networks: networks:
- testnet - testnet
# cluster_node_1: cluster_node_1:
# container_name: cluster_node_1 container_name: cluster_node_1
# build: build:
# context: . context: .
# dockerfile: Dockerfile.dev dockerfile: Dockerfile.dev
# environment: environment:
# - PORT=7480 - PORT=7480
# - RAFT_PORT=8000 - RAFT_PORT=8000
# - ML_PORT=7946 - ML_PORT=7946
# - KEY=/generic/ssl/certs/echovault/server1.key - KEY=/generic/ssl/certs/echovault/server1.key
# - CERT=/generic/ssl/certs/echovault/server1.crt - CERT=/generic/ssl/certs/echovault/server1.crt
# - SERVER_ID=1 - SERVER_ID=1
# - DATA_DIR=/var/lib/echovault - DATA_DIR=/var/lib/echovault
# - IN_MEMORY=false - IN_MEMORY=false
# - TLS=false - TLS=false
# - MTLS=false - MTLS=false
# - BOOTSTRAP_CLUSTER=true - BOOTSTRAP_CLUSTER=true
# - ACL_CONFIG=/generic/config/echovault/acl.yml - ACL_CONFIG=/generic/config/echovault/acl.yml
# - REQUIRE_PASS=false - REQUIRE_PASS=false
# - FORWARD_COMMAND=true - FORWARD_COMMAND=true
# - SNAPSHOT_THRESHOLD=1000 - SNAPSHOT_THRESHOLD=1000
# - SNAPSHOT_INTERVAL=5m30s - SNAPSHOT_INTERVAL=5m30s
# - RESTORE_SNAPSHOT=false - RESTORE_SNAPSHOT=false
# - RESTORE_AOF=false - RESTORE_AOF=false
# - AOF_SYNC_STRATEGY=everysec - AOF_SYNC_STRATEGY=everysec
# - MAX_MEMORY=2000kb - MAX_MEMORY=2000kb
# - EVICTION_POLICY=allkeys-lfu - EVICTION_POLICY=allkeys-lfu
# # List of server cert/key pairs # List of server cert/key pairs
# - CERT_KEY_PAIR_1=/generic/ssl/certs/echovault/server/server1.crt,/generic/ssl/certs/echovault/server/server1.key - CERT_KEY_PAIR_1=/generic/ssl/certs/echovault/server/server1.crt,/generic/ssl/certs/echovault/server/server1.key
# - CERT_KEY_PAIR_2=/generic/ssl/certs/echovault/server/server2.crt,/generic/ssl/certs/echovault/server/server2.key - CERT_KEY_PAIR_2=/generic/ssl/certs/echovault/server/server2.crt,/generic/ssl/certs/echovault/server/server2.key
# # List of client certificate authorities # List of client certificate authorities
# - CLIENT_CA_1=/generic/ssl/certs/echovault/client/rootCA.crt - CLIENT_CA_1=/generic/ssl/certs/echovault/client/rootCA.crt
# ports: ports:
# - "7481:7480" - "7481:7480"
# - "7945:7946" - "7945:7946"
# - "8000:8000" - "8000:8000"
# volumes: volumes:
# - ./config/acl.yml:/generic/config/echovault/acl.yml - ./config/acl.yml:/generic/config/echovault/acl.yml
# - ./volumes/cluster_node_1:/var/lib/echovault - ./volumes/cluster_node_1:/var/lib/echovault
# networks: networks:
# - testnet - testnet
#
# cluster_node_2: cluster_node_2:
# container_name: cluster_node_2 container_name: cluster_node_2
# build: build:
# context: . context: .
# dockerfile: Dockerfile.dev dockerfile: Dockerfile.dev
# environment: environment:
# - PORT=7480 - PORT=7480
# - RAFT_PORT=8000 - RAFT_PORT=8000
# - ML_PORT=7946 - ML_PORT=7946
# - KEY=/generic/ssl/certs/echovault/server1.key - KEY=/generic/ssl/certs/echovault/server1.key
# - CERT=/generic/ssl/certs/echovault/server1.crt - CERT=/generic/ssl/certs/echovault/server1.crt
# - SERVER_ID=2 - SERVER_ID=2
# - JOIN_ADDR=cluster_node_1:7946 - JOIN_ADDR=cluster_node_1:7946
# - DATA_DIR=/var/lib/echovault - DATA_DIR=/var/lib/echovault
# - IN_MEMORY=false - IN_MEMORY=false
# - TLS=false - TLS=false
# - MTLS=false - MTLS=false
# - BOOTSTRAP_CLUSTER=false - BOOTSTRAP_CLUSTER=false
# - ACL_CONFIG=/generic/config/echovault/acl.yml - ACL_CONFIG=/generic/config/echovault/acl.yml
# - REQUIRE_PASS=false - REQUIRE_PASS=false
# - FORWARD_COMMAND=true - FORWARD_COMMAND=true
# - SNAPSHOT_THRESHOLD=1000 - SNAPSHOT_THRESHOLD=1000
# - SNAPSHOT_INTERVAL=5m30s - SNAPSHOT_INTERVAL=5m30s
# - RESTORE_SNAPSHOT=false - RESTORE_SNAPSHOT=false
# - RESTORE_AOF=false - RESTORE_AOF=false
# - AOF_SYNC_STRATEGY=everysec - AOF_SYNC_STRATEGY=everysec
# - MAX_MEMORY=2000kb - MAX_MEMORY=2000kb
# - EVICTION_POLICY=allkeys-lfu - EVICTION_POLICY=allkeys-lfu
# # List of server cert/key pairs # List of server cert/key pairs
# - CERT_KEY_PAIR_1=/generic/ssl/certs/echovault/server/server1.crt,/generic/ssl/certs/echovault/server/server1.key - CERT_KEY_PAIR_1=/generic/ssl/certs/echovault/server/server1.crt,/generic/ssl/certs/echovault/server/server1.key
# - CERT_KEY_PAIR_2=/generic/ssl/certs/echovault/server/server2.crt,/generic/ssl/certs/echovault/server/server2.key - CERT_KEY_PAIR_2=/generic/ssl/certs/echovault/server/server2.crt,/generic/ssl/certs/echovault/server/server2.key
# # List of client certificate authorities # List of client certificate authorities
# - CLIENT_CA_1=/generic/ssl/certs/echovault/client/rootCA.crt - CLIENT_CA_1=/generic/ssl/certs/echovault/client/rootCA.crt
# ports: ports:
# - "7482:7480" - "7482:7480"
# - "7947:7946" - "7947:7946"
# - "8001:8000" - "8001:8000"
# volumes: volumes:
# - ./config/acl.yml:/generic/config/echovault/acl.yml - ./config/acl.yml:/generic/config/echovault/acl.yml
# - ./volumes/cluster_node_2:/var/lib/echovault - ./volumes/cluster_node_2:/var/lib/echovault
# networks: networks:
# - testnet - testnet
#
# cluster_node_3: cluster_node_3:
# container_name: cluster_node_3 container_name: cluster_node_3
# build: build:
# context: . context: .
# dockerfile: Dockerfile.dev dockerfile: Dockerfile.dev
# environment: environment:
# - PORT=7480 - PORT=7480
# - RAFT_PORT=8000 - RAFT_PORT=8000
# - ML_PORT=7946 - ML_PORT=7946
# - KEY=/generic/ssl/certs/echovault/server1.key - KEY=/generic/ssl/certs/echovault/server1.key
# - CERT=/generic/ssl/certs/echovault/server1.crt - CERT=/generic/ssl/certs/echovault/server1.crt
# - SERVER_ID=3 - SERVER_ID=3
# - JOIN_ADDR=cluster_node_1:7946 - JOIN_ADDR=cluster_node_1:7946
# - DATA_DIR=/var/lib/echovault - DATA_DIR=/var/lib/echovault
# - IN_MEMORY=false - IN_MEMORY=false
# - TLS=false - TLS=false
# - MTLS=false - MTLS=false
# - BOOTSTRAP_CLUSTER=false - BOOTSTRAP_CLUSTER=false
# - ACL_CONFIG=/generic/config/echovault/acl.yml - ACL_CONFIG=/generic/config/echovault/acl.yml
# - REQUIRE_PASS=false - REQUIRE_PASS=false
# - FORWARD_COMMAND=true - FORWARD_COMMAND=true
# - SNAPSHOT_THRESHOLD=1000 - SNAPSHOT_THRESHOLD=1000
# - SNAPSHOT_INTERVAL=5m30s - SNAPSHOT_INTERVAL=5m30s
# - RESTORE_SNAPSHOT=false - RESTORE_SNAPSHOT=false
# - RESTORE_AOF=false - RESTORE_AOF=false
# - AOF_SYNC_STRATEGY=everysec - AOF_SYNC_STRATEGY=everysec
# - MAX_MEMORY=2000kb - MAX_MEMORY=2000kb
# - EVICTION_POLICY=allkeys-lfu - EVICTION_POLICY=allkeys-lfu
# # List of server cert/key pairs # List of server cert/key pairs
# - CERT_KEY_PAIR_1=/generic/ssl/certs/echovault/server/server1.crt,/generic/ssl/certs/echovault/server/server1.key - CERT_KEY_PAIR_1=/generic/ssl/certs/echovault/server/server1.crt,/generic/ssl/certs/echovault/server/server1.key
# - CERT_KEY_PAIR_2=/generic/ssl/certs/echovault/server/server2.crt,/generic/ssl/certs/echovault/server/server2.key - CERT_KEY_PAIR_2=/generic/ssl/certs/echovault/server/server2.crt,/generic/ssl/certs/echovault/server/server2.key
# # List of client certificate authorities # List of client certificate authorities
# - CLIENT_CA_1=/generic/ssl/certs/echovault/client/rootCA.crt - CLIENT_CA_1=/generic/ssl/certs/echovault/client/rootCA.crt
# ports: ports:
# - "7483:7480" - "7483:7480"
# - "7948:7946" - "7948:7946"
# - "8002:8000" - "8002:8000"
# volumes: volumes:
# - ./config/acl.yml:/generic/config/echovault/acl.yml - ./config/acl.yml:/generic/config/echovault/acl.yml
# - ./volumes/cluster_node_3:/var/lib/echovault - ./volumes/cluster_node_3:/var/lib/echovault
# networks: networks:
# - testnet - testnet
#
# cluster_node_4: cluster_node_4:
# container_name: cluster_node_4 container_name: cluster_node_4
# build: build:
# context: . context: .
# dockerfile: Dockerfile.dev dockerfile: Dockerfile.dev
# environment: environment:
# - PORT=7480 - PORT=7480
# - RAFT_PORT=8000 - RAFT_PORT=8000
# - ML_PORT=7946 - ML_PORT=7946
# - KEY=/generic/ssl/certs/echovault/server1.key - KEY=/generic/ssl/certs/echovault/server1.key
# - CERT=/generic/ssl/certs/echovault/server1.crt - CERT=/generic/ssl/certs/echovault/server1.crt
# - SERVER_ID=4 - SERVER_ID=4
# - JOIN_ADDR=cluster_node_1:7946 - JOIN_ADDR=cluster_node_1:7946
# - DATA_DIR=/var/lib/echovault - DATA_DIR=/var/lib/echovault
# - IN_MEMORY=false - IN_MEMORY=false
# - TLS=false - TLS=false
# - MTLS=false - MTLS=false
# - BOOTSTRAP_CLUSTER=false - BOOTSTRAP_CLUSTER=false
# - ACL_CONFIG=/generic/config/echovault/acl.yml - ACL_CONFIG=/generic/config/echovault/acl.yml
# - REQUIRE_PASS=false - REQUIRE_PASS=false
# - FORWARD_COMMAND=true - FORWARD_COMMAND=true
# - SNAPSHOT_THRESHOLD=1000 - SNAPSHOT_THRESHOLD=1000
# - SNAPSHOT_INTERVAL=5m30s - SNAPSHOT_INTERVAL=5m30s
# - RESTORE_SNAPSHOT=false - RESTORE_SNAPSHOT=false
# - RESTORE_AOF=false - RESTORE_AOF=false
# - AOF_SYNC_STRATEGY=everysec - AOF_SYNC_STRATEGY=everysec
# - MAX_MEMORY=2000kb - MAX_MEMORY=2000kb
# - EVICTION_POLICY=allkeys-lfu - EVICTION_POLICY=allkeys-lfu
# # List of server cert/key pairs # List of server cert/key pairs
# - CERT_KEY_PAIR_1=/generic/ssl/certs/echovault/server/server1.crt,/generic/ssl/certs/echovault/server/server1.key - CERT_KEY_PAIR_1=/generic/ssl/certs/echovault/server/server1.crt,/generic/ssl/certs/echovault/server/server1.key
# - CERT_KEY_PAIR_2=/generic/ssl/certs/echovault/server/server2.crt,/generic/ssl/certs/echovault/server/server2.key - CERT_KEY_PAIR_2=/generic/ssl/certs/echovault/server/server2.crt,/generic/ssl/certs/echovault/server/server2.key
# # List of client certificate authorities # List of client certificate authorities
# - CLIENT_CA_1=/generic/ssl/certs/echovault/client/rootCA.crt - CLIENT_CA_1=/generic/ssl/certs/echovault/client/rootCA.crt
# ports: ports:
# - "7484:7480" - "7484:7480"
# - "7949:7946" - "7949:7946"
# - "8003:8000" - "8003:8000"
# volumes: volumes:
# - ./config/acl.yml:/generic/config/echovault/acl.yml - ./config/acl.yml:/generic/config/echovault/acl.yml
# - ./volumes/cluster_node_4:/var/lib/echovault - ./volumes/cluster_node_4:/var/lib/echovault
# networks: networks:
# - testnet - testnet
#
# cluster_node_5: cluster_node_5:
# container_name: cluster_node_5 container_name: cluster_node_5
# build: build:
# context: . context: .
# dockerfile: Dockerfile.dev dockerfile: Dockerfile.dev
# environment: environment:
# - PORT=7480 - PORT=7480
# - RAFT_PORT=8000 - RAFT_PORT=8000
# - ML_PORT=7946 - ML_PORT=7946
# - KEY=/generic/ssl/certs/echovault/server1.key - KEY=/generic/ssl/certs/echovault/server1.key
# - CERT=/generic/ssl/certs/echovault/server1.crt - CERT=/generic/ssl/certs/echovault/server1.crt
# - SERVER_ID=5 - SERVER_ID=5
# - JOIN_ADDR=cluster_node_1:7946 - JOIN_ADDR=cluster_node_1:7946
# - DATA_DIR=/var/lib/echovault - DATA_DIR=/var/lib/echovault
# - IN_MEMORY=false - IN_MEMORY=false
# - TLS=false - TLS=false
# - MTLS=false - MTLS=false
# - BOOTSTRAP_CLUSTER=false - BOOTSTRAP_CLUSTER=false
# - ACL_CONFIG=/generic/config/echovault/acl.yml - ACL_CONFIG=/generic/config/echovault/acl.yml
# - REQUIRE_PASS=false - REQUIRE_PASS=false
# - FORWARD_COMMAND=true - FORWARD_COMMAND=true
# - SNAPSHOT_THRESHOLD=1000 - SNAPSHOT_THRESHOLD=1000
# - SNAPSHOT_INTERVAL=5m30s - SNAPSHOT_INTERVAL=5m30s
# - RESTORE_SNAPSHOT=false - RESTORE_SNAPSHOT=false
# - RESTORE_AOF=false - RESTORE_AOF=false
# - AOF_SYNC_STRATEGY=everysec - AOF_SYNC_STRATEGY=everysec
# - MAX_MEMORY=2000kb - MAX_MEMORY=2000kb
# - EVICTION_POLICY=allkeys-lfu - EVICTION_POLICY=allkeys-lfu
# # List of server cert/key pairs # List of server cert/key pairs
# - CERT_KEY_PAIR_1=/generic/ssl/certs/echovault/server/server1.crt,/generic/ssl/certs/echovault/server/server1.key - CERT_KEY_PAIR_1=/generic/ssl/certs/echovault/server/server1.crt,/generic/ssl/certs/echovault/server/server1.key
# - CERT_KEY_PAIR_2=/generic/ssl/certs/echovault/server/server2.crt,/generic/ssl/certs/echovault/server/server2.key - CERT_KEY_PAIR_2=/generic/ssl/certs/echovault/server/server2.crt,/generic/ssl/certs/echovault/server/server2.key
# # List of client certificate authorities # List of client certificate authorities
# - CLIENT_CA_1=/generic/ssl/certs/echovault/client/rootCA.crt - CLIENT_CA_1=/generic/ssl/certs/echovault/client/rootCA.crt
# ports: ports:
# - "7485:7480" - "7485:7480"
# - "7950:7946" - "7950:7946"
# - "8004:8000" - "8004:8000"
# volumes: volumes:
# - ./config/acl.yml:/generic/config/echovault/acl.yml - ./config/acl.yml:/generic/config/echovault/acl.yml
# - ./volumes/cluster_node_5:/var/lib/echovault - ./volumes/cluster_node_5:/var/lib/echovault
# networks: networks:
# - testnet - testnet

View File

@@ -21,6 +21,7 @@ type DelegateOpts struct {
addVoter func(id raft.ServerID, address raft.ServerAddress, prevIndex uint64, timeout time.Duration) error addVoter func(id raft.ServerID, address raft.ServerAddress, prevIndex uint64, timeout time.Duration) error
isRaftLeader func() bool isRaftLeader func() bool
applyMutate func(ctx context.Context, cmd []string) ([]byte, error) applyMutate func(ctx context.Context, cmd []string) ([]byte, error)
applyDeleteKey func(ctx context.Context, key string) error
} }
func NewDelegate(opts DelegateOpts) *Delegate { func NewDelegate(opts DelegateOpts) *Delegate {
@@ -67,6 +68,24 @@ func (delegate *Delegate) NotifyMsg(msgBytes []byte) {
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
} }
case "DeleteKey":
// If the current node is not a cluster leader, re-broadcast the message
if !delegate.options.isRaftLeader() {
delegate.options.broadcastQueue.QueueBroadcast(&msg)
return
}
// Current node is the cluster leader, handle the key deletion
ctx := context.WithValue(
context.WithValue(context.Background(), utils.ContextServerID("ServerID"), string(msg.ServerID)),
utils.ContextConnID("ConnectionID"), msg.ConnId)
key := string(msg.Content)
if err := delegate.options.applyDeleteKey(ctx, key); err != nil {
log.Println(err)
}
case "MutateData": case "MutateData":
// If the current node is not a cluster leader, re-broadcast the message // If the current node is not a cluster leader, re-broadcast the message
if !delegate.options.isRaftLeader() { if !delegate.options.isRaftLeader() {

View File

@@ -19,23 +19,24 @@ type NodeMeta struct {
RaftAddr raft.ServerAddress `json:"RaftAddr"` RaftAddr raft.ServerAddress `json:"RaftAddr"`
} }
type MemberlistOpts struct { type Opts struct {
Config utils.Config Config utils.Config
HasJoinedCluster func() bool HasJoinedCluster func() bool
AddVoter func(id raft.ServerID, address raft.ServerAddress, prevIndex uint64, timeout time.Duration) error AddVoter func(id raft.ServerID, address raft.ServerAddress, prevIndex uint64, timeout time.Duration) error
RemoveRaftServer func(meta NodeMeta) error RemoveRaftServer func(meta NodeMeta) error
IsRaftLeader func() bool IsRaftLeader func() bool
ApplyMutate func(ctx context.Context, cmd []string) ([]byte, error) ApplyMutate func(ctx context.Context, cmd []string) ([]byte, error)
ApplyDeleteKey func(ctx context.Context, key string) error
} }
type MemberList struct { type MemberList struct {
options MemberlistOpts options Opts
broadcastQueue *memberlist.TransmitLimitedQueue broadcastQueue *memberlist.TransmitLimitedQueue
numOfNodes int numOfNodes int
memberList *memberlist.Memberlist memberList *memberlist.Memberlist
} }
func NewMemberList(opts MemberlistOpts) *MemberList { func NewMemberList(opts Opts) *MemberList {
return &MemberList{ return &MemberList{
options: opts, options: opts,
broadcastQueue: new(memberlist.TransmitLimitedQueue), broadcastQueue: new(memberlist.TransmitLimitedQueue),
@@ -53,6 +54,7 @@ func (m *MemberList) MemberListInit(ctx context.Context) {
addVoter: m.options.AddVoter, addVoter: m.options.AddVoter,
isRaftLeader: m.options.IsRaftLeader, isRaftLeader: m.options.IsRaftLeader,
applyMutate: m.options.ApplyMutate, applyMutate: m.options.ApplyMutate,
applyDeleteKey: m.options.ApplyDeleteKey,
}) })
cfg.Events = NewEventDelegate(EventDelegateOpts{ cfg.Events = NewEventDelegate(EventDelegateOpts{
incrementNodes: func() { m.numOfNodes += 1 }, incrementNodes: func() { m.numOfNodes += 1 },
@@ -75,8 +77,8 @@ func (m *MemberList) MemberListInit(ctx context.Context) {
if m.options.Config.JoinAddr != "" { if m.options.Config.JoinAddr != "" {
backoffPolicy := utils.RetryBackoff(retry.NewFibonacci(1*time.Second), 5, 200*time.Millisecond, 0, 0) backoffPolicy := utils.RetryBackoff(retry.NewFibonacci(1*time.Second), 5, 200*time.Millisecond, 0, 0)
err := retry.Do(ctx, backoffPolicy, func(ctx context.Context) error { err = retry.Do(ctx, backoffPolicy, func(ctx context.Context) error {
_, err := list.Join([]string{m.options.Config.JoinAddr}) _, err = list.Join([]string{m.options.Config.JoinAddr})
if err != nil { if err != nil {
return retry.RetryableError(err) return retry.RetryableError(err)
} }
@@ -103,9 +105,26 @@ func (m *MemberList) broadcastRaftAddress(ctx context.Context) {
m.broadcastQueue.QueueBroadcast(&msg) m.broadcastQueue.QueueBroadcast(&msg)
} }
// The ForwardDeleteKey function is only called by non-leaders.
// It uses the broadcast queue to forward a key eviction command within the cluster.
func (m *MemberList) ForwardDeleteKey(ctx context.Context, key string) {
connId, _ := ctx.Value(utils.ContextConnID("ConnectionID")).(string)
m.broadcastQueue.QueueBroadcast(&BroadcastMessage{
Action: "DeleteKey",
Content: []byte(key),
ContentHash: md5.Sum([]byte(key)),
ConnId: connId,
NodeMeta: NodeMeta{
ServerID: raft.ServerID(m.options.Config.ServerID),
RaftAddr: raft.ServerAddress(fmt.Sprintf("%s:%d",
m.options.Config.BindAddr, m.options.Config.RaftBindPort)),
},
})
}
// The ForwardDataMutation function is only called by non-leaders.
// It uses the broadcast queue to forward a data mutation within the cluster.
func (m *MemberList) ForwardDataMutation(ctx context.Context, cmd []byte) { func (m *MemberList) ForwardDataMutation(ctx context.Context, cmd []byte) {
// This function is only called by non-leaders
// It uses the broadcast queue to forward a data mutation within the cluster
connId, _ := ctx.Value(utils.ContextConnID("ConnectionID")).(string) connId, _ := ctx.Value(utils.ContextConnID("ConnectionID")).(string)
m.broadcastQueue.QueueBroadcast(&BroadcastMessage{ m.broadcastQueue.QueueBroadcast(&BroadcastMessage{
Action: "MutateData", Action: "MutateData",

View File

@@ -15,6 +15,7 @@ type FSMOpts struct {
Config utils.Config Config utils.Config
Server utils.Server Server utils.Server
GetCommand func(command string) (utils.Command, error) GetCommand func(command string) (utils.Command, error)
DeleteKey func(ctx context.Context, key string) error
} }
type FSM struct { type FSM struct {
@@ -53,7 +54,16 @@ func (fsm *FSM) Apply(log *raft.Log) interface{} {
} }
case "delete-key": case "delete-key":
// TODO: Handle key deletion if err := fsm.options.DeleteKey(ctx, request.Key); err != nil {
return utils.ApplyResponse{
Error: err,
Response: nil,
}
}
return utils.ApplyResponse{
Error: nil,
Response: []byte("OK"),
}
case "command": case "command":
// Handle command // Handle command

View File

@@ -20,6 +20,7 @@ type Opts struct {
Config utils.Config Config utils.Config
Server utils.Server Server utils.Server
GetCommand func(command string) (utils.Command, error) GetCommand func(command string) (utils.Command, error)
DeleteKey func(ctx context.Context, key string) error
} }
type Raft struct { type Raft struct {
@@ -94,6 +95,7 @@ func (r *Raft) RaftInit(ctx context.Context) {
Config: r.options.Config, Config: r.options.Config,
Server: r.options.Server, Server: r.options.Server,
GetCommand: r.options.GetCommand, GetCommand: r.options.GetCommand,
DeleteKey: r.options.DeleteKey,
}), }),
logStore, logStore,
stableStore, stableStore,

View File

@@ -85,10 +85,25 @@ func (server *Server) KeyExists(ctx context.Context, key string) bool {
} }
if entry.ExpireAt != (time.Time{}) && entry.ExpireAt.Before(time.Now()) { if entry.ExpireAt != (time.Time{}) && entry.ExpireAt.Before(time.Now()) {
if !server.IsInCluster() {
// If in standalone mode, delete the key directly.
err := server.DeleteKey(ctx, key) err := server.DeleteKey(ctx, key)
if err != nil { if err != nil {
log.Printf("keyExists: %+v\n", err) log.Printf("keyExists: %+v\n", err)
} }
} else if server.IsInCluster() && server.raft.IsRaftLeader() {
// If we're in a raft cluster, and we're the leader, send command to delete the key in the cluster.
err := server.raftApplyDeleteKey(ctx, key)
if err != nil {
log.Printf("keyExists: %+v\n", err)
}
} else if server.IsInCluster() && !server.raft.IsRaftLeader() {
// Forward message to leader to initiate key deletion.
// This is always called regardless of ForwardCommand config value
// because we always want to remove expired keys.
server.memberList.ForwardDeleteKey(ctx, key)
}
return false return false
} }
@@ -251,6 +266,8 @@ func (server *Server) DeleteKey(ctx context.Context, key string) error {
server.lruCache.cache.Delete(key) server.lruCache.cache.Delete(key)
} }
log.Printf("deleted key %s\n", key)
return nil return nil
} }

View File

@@ -94,14 +94,16 @@ func NewServer(opts Opts) *Server {
Config: opts.Config, Config: opts.Config,
Server: server, Server: server,
GetCommand: server.getCommand, GetCommand: server.getCommand,
DeleteKey: server.DeleteKey,
}) })
server.memberList = memberlist.NewMemberList(memberlist.MemberlistOpts{ server.memberList = memberlist.NewMemberList(memberlist.Opts{
Config: opts.Config, Config: opts.Config,
HasJoinedCluster: server.raft.HasJoinedCluster, HasJoinedCluster: server.raft.HasJoinedCluster,
AddVoter: server.raft.AddVoter, AddVoter: server.raft.AddVoter,
RemoveRaftServer: server.raft.RemoveServer, RemoveRaftServer: server.raft.RemoveServer,
IsRaftLeader: server.raft.IsRaftLeader, IsRaftLeader: server.raft.IsRaftLeader,
ApplyMutate: server.raftApplyCommand, ApplyMutate: server.raftApplyCommand,
ApplyDeleteKey: server.raftApplyDeleteKey,
}) })
} else { } else {
// Set up standalone snapshot engine // Set up standalone snapshot engine