Refactored broadcast for joining raft cluster. Instead of broadcasting repeatedly with a ticker, the message will be relayed using gossip protocol until it reaches the raft leader.

Created scaffolding for forwarding mutation commands from replica nodes to leader node.
This commit is contained in:
Kelvin Clement Mwinuka
2024-01-19 16:40:55 +08:00
parent c82560294d
commit 1ac941151f
8 changed files with 86 additions and 60 deletions

View File

@@ -27,3 +27,4 @@ CMD "./server" \
"--aclConfig=${ACL_CONFIG}" \
"--requirePass=${REQUIRE_PASS}" \
"--password=${PASSWORD}" \
"--forwardCommand=${FORWARD_COMMAND}" \

View File

@@ -254,11 +254,13 @@ func (server *Server) handleConnection(ctx context.Context, conn net.Conn) {
connRW.Write(r.Response)
connRW.Flush()
// TODO: Add command to AOF
} else if server.config.ForwardCommand {
// Forward message to leader and return immediate OK response
server.memberList.ForwardDataMutation(ctx, message)
connRW.Write([]byte(utils.OK_RESPONSE))
connRW.Flush()
} else {
// TODO: Forward message to leader and wait for a response
connRW.Write([]byte("-Error not cluster leader, cannot carry out command\r\n\n"))
connRW.Write([]byte("-Error not cluster leader, cannot carry out command\r\n\r\n"))
connRW.Flush()
}
}
@@ -378,6 +380,7 @@ func (server *Server) Start(ctx context.Context) {
HasJoinedCluster: server.raft.HasJoinedCluster,
AddVoter: server.raft.AddVoter,
RemoveRaftServer: server.raft.RemoveServer,
IsRaftLeader: server.raft.IsRaftLeader,
})
server.raft.RaftInit(ctx)
server.memberList.MemberListInit(ctx)

View File

@@ -10,21 +10,25 @@ type BroadcastMessage struct {
NodeMeta
Action string `json:"Action"`
Content string `json:"Content"`
ContentHash string `json:"ContentHash"`
}
// Invalidates Implements Broadcast interface
func (broadcastMessage *BroadcastMessage) Invalidates(other memberlist.Broadcast) bool {
mb, ok := other.(*BroadcastMessage)
otherBroadcast, ok := other.(*BroadcastMessage)
if !ok {
return false
}
if mb.ServerID == broadcastMessage.ServerID && mb.Action == "RaftJoin" {
return true
}
switch broadcastMessage.Action {
case "RaftJoin":
return broadcastMessage.Action == otherBroadcast.Action && broadcastMessage.ServerID == otherBroadcast.ServerID
case "MutateData":
return broadcastMessage.Action == otherBroadcast.Action && broadcastMessage.ContentHash == otherBroadcast.ContentHash
default:
return false
}
}
// Message Implements Broadcast interface

View File

@@ -17,6 +17,7 @@ type DelegateOpts struct {
config utils.Config
broadcastQueue *memberlist.TransmitLimitedQueue
addVoter func(id raft.ServerID, address raft.ServerAddress, prevIndex uint64, timeout time.Duration) error
isRaftLeader func() bool
}
func NewDelegate(opts DelegateOpts) *Delegate {
@@ -54,11 +55,23 @@ func (delegate *Delegate) NotifyMsg(msgBytes []byte) {
switch msg.Action {
case "RaftJoin":
if err := delegate.options.addVoter(msg.NodeMeta.ServerID, msg.NodeMeta.RaftAddr, 0, 0); err != nil {
// If the current node is not the cluster leader, re-broadcast the message
if !delegate.options.isRaftLeader() {
delegate.options.broadcastQueue.QueueBroadcast(&msg)
return
}
err := delegate.options.addVoter(msg.NodeMeta.ServerID, msg.NodeMeta.RaftAddr, 0, 0)
if err != nil {
fmt.Println(err)
}
case "MutateData":
// Mutate the value at a given key
// If the current node is not a cluster leader, re-broadcast the message
if !delegate.options.isRaftLeader() {
delegate.options.broadcastQueue.QueueBroadcast(&msg)
return
}
// Current node is the cluster leader, handle the mutation
fmt.Println(msg)
}
}

View File

@@ -11,9 +11,9 @@ type EventDelegate struct {
}
type EventDelegateOpts struct {
IncrementNodes func()
DecrementNodes func()
RemoveRaftServer func(meta NodeMeta) error
incrementNodes func()
decrementNodes func()
removeRaftServer func(meta NodeMeta) error
}
func NewEventDelegate(opts EventDelegateOpts) *EventDelegate {
@@ -24,12 +24,12 @@ func NewEventDelegate(opts EventDelegateOpts) *EventDelegate {
// NotifyJoin implements EventDelegate interface
func (eventDelegate *EventDelegate) NotifyJoin(node *memberlist.Node) {
eventDelegate.options.IncrementNodes()
eventDelegate.options.incrementNodes()
}
// NotifyLeave implements EventDelegate interface
func (eventDelegate *EventDelegate) NotifyLeave(node *memberlist.Node) {
eventDelegate.options.DecrementNodes()
eventDelegate.options.decrementNodes()
var meta NodeMeta
@@ -40,7 +40,7 @@ func (eventDelegate *EventDelegate) NotifyLeave(node *memberlist.Node) {
return
}
err = eventDelegate.options.RemoveRaftServer(meta)
err = eventDelegate.options.removeRaftServer(meta)
if err != nil {
fmt.Println(err)

View File

@@ -23,6 +23,7 @@ type MemberlistOpts struct {
HasJoinedCluster func() bool
AddVoter func(id raft.ServerID, address raft.ServerAddress, prevIndex uint64, timeout time.Duration) error
RemoveRaftServer func(meta NodeMeta) error
IsRaftLeader func() bool
}
type MemberList struct {
@@ -48,11 +49,12 @@ func (m *MemberList) MemberListInit(ctx context.Context) {
config: m.options.Config,
broadcastQueue: m.broadcastQueue,
addVoter: m.options.AddVoter,
isRaftLeader: m.options.IsRaftLeader,
})
cfg.Events = NewEventDelegate(EventDelegateOpts{
IncrementNodes: func() { m.numOfNodes += 1 },
DecrementNodes: func() { m.numOfNodes -= 1 },
RemoveRaftServer: m.options.RemoveRaftServer,
incrementNodes: func() { m.numOfNodes += 1 },
decrementNodes: func() { m.numOfNodes -= 1 },
removeRaftServer: m.options.RemoveRaftServer,
})
m.broadcastQueue.RetransmitMult = 1
@@ -82,14 +84,11 @@ func (m *MemberList) MemberListInit(ctx context.Context) {
log.Fatal(err)
}
go m.broadcastRaftAddress(ctx)
m.broadcastRaftAddress(ctx)
}
}
func (m *MemberList) broadcastRaftAddress(ctx context.Context) {
ticker := time.NewTicker(5 * time.Second)
for {
msg := BroadcastMessage{
Action: "RaftJoin",
NodeMeta: NodeMeta{
@@ -98,15 +97,16 @@ func (m *MemberList) broadcastRaftAddress(ctx context.Context) {
m.options.Config.BindAddr, m.options.Config.RaftBindPort)),
},
}
if m.options.HasJoinedCluster() {
return
}
m.broadcastQueue.QueueBroadcast(&msg)
}
<-ticker.C
}
func (m *MemberList) ForwardDataMutation(ctx context.Context, cmd string) {
// This function is only called by non-leaders
// It uses the broadcast queue to forward a data mutation within the cluster
m.broadcastQueue.QueueBroadcast(&BroadcastMessage{
Action: "MutateData",
Content: cmd,
})
}
func (m *MemberList) MemberListShutdown(ctx context.Context) {

View File

@@ -153,9 +153,7 @@ func (r *Raft) AddVoter(
prevIndex uint64,
timeout time.Duration,
) error {
if !r.IsRaftLeader() {
return errors.New("not leader, cannot add voter")
}
if r.IsRaftLeader() {
raftConfig := r.raft.GetConfiguration()
if err := raftConfig.Error(); err != nil {
return errors.New("could not retrieve raft config")
@@ -172,6 +170,7 @@ func (r *Raft) AddVoter(
if err != nil {
return err
}
}
return nil
}

View File

@@ -26,6 +26,7 @@ type Config struct {
DataDir string `json:"dataDir" yaml:"dataDir"`
BootstrapCluster bool `json:"BootstrapCluster" yaml:"bootstrapCluster"`
AclConfig string `json:"AclConfig" yaml:"AclConfig"`
ForwardCommand bool `json:"forwardCommand" yaml:"forwardCommand"`
RequirePass bool `json:"requirePass" yaml:"requirePass"`
Password string `json:"password" yaml:"password"`
}
@@ -42,10 +43,14 @@ func GetConfig() (Config, error) {
bindAddr := flag.String("bindAddr", "", "Address to bind the server to.")
raftBindPort := flag.Int("raftPort", 7481, "Port to use for intra-cluster communication. Leave on the client.")
mlBindPort := flag.Int("mlPort", 7946, "Port to use for memberlist communication.")
inMemory := flag.Bool("inMemory", false, "Whether to use memory or persisten storage for raft logs and snapshots.")
inMemory := flag.Bool("inMemory", false, "Whether to use memory or persistent storage for raft logs and snapshots.")
dataDir := flag.String("dataDir", "/var/lib/memstore", "Directory to store raft snapshots and logs.")
bootstrapCluster := flag.Bool("bootstrapCluster", false, "Whether this instance should bootstrap a new cluster.")
aclConfig := flag.String("aclConfig", "", "ACL config file path.")
forwardCommand := flag.Bool(
"forwardCommand",
false,
"If the node is a follower, this flag forwards mutation command to the leader when set to true")
requirePass := flag.Bool(
"requirePass",
false,
@@ -82,6 +87,7 @@ It is a plain text value by default but you can provide a SHA256 hash by adding
DataDir: *dataDir,
BootstrapCluster: *bootstrapCluster,
AclConfig: *aclConfig,
ForwardCommand: *forwardCommand,
RequirePass: *requirePass,
Password: *password,
}