Repetetive broadcast of raft join request message until a join signal is received on a dedicated channel.

Handle multiple types of Actions in NotifyMsg.
Added function to check if the current server is the raft leader.
This commit is contained in:
Kelvin Clement Mwinuka
2023-07-30 20:31:56 +08:00
parent d82a6a98d7
commit 094d44c9a0
3 changed files with 63 additions and 12 deletions

View File

@@ -35,6 +35,7 @@ type Server struct {
numOfNodes int numOfNodes int
cancelCh *chan (os.Signal) cancelCh *chan (os.Signal)
raftJoinCh *chan (struct{})
} }
func (server *Server) Lock() { func (server *Server) Lock() {
@@ -201,8 +202,9 @@ func main() {
cancelCh := make(chan (os.Signal), 1) cancelCh := make(chan (os.Signal), 1)
signal.Notify(cancelCh, syscall.SIGINT, syscall.SIGTERM) signal.Notify(cancelCh, syscall.SIGINT, syscall.SIGTERM)
raftJoinCh := make(chan struct{})
server := &Server{ server := &Server{
cancelCh: &cancelCh,
config: config, config: config,
broadcastQueue: new(memberlist.TransmitLimitedQueue), broadcastQueue: new(memberlist.TransmitLimitedQueue),
@@ -213,6 +215,9 @@ func main() {
NewSetGetCommand(), NewSetGetCommand(),
NewListCommand(), NewListCommand(),
}, },
cancelCh: &cancelCh,
raftJoinCh: &raftJoinCh,
} }
go server.Start() go server.Start()

View File

@@ -15,8 +15,10 @@ type BroadcastMessage struct {
Action string `json:"Action"` Action string `json:"Action"`
ServerID string `json:"ServerID"` ServerID string `json:"ServerID"`
ServerAddr string `json:"ServerAddr"` ServerAddr string `json:"ServerAddr"`
Content string `json:"Content"`
} }
// Implements Broadcast interface
func (broadcastMessage *BroadcastMessage) Invalidates(other memberlist.Broadcast) bool { func (broadcastMessage *BroadcastMessage) Invalidates(other memberlist.Broadcast) bool {
mb, ok := other.(*BroadcastMessage) mb, ok := other.(*BroadcastMessage)
@@ -31,6 +33,7 @@ func (broadcastMessage *BroadcastMessage) Invalidates(other memberlist.Broadcast
return false return false
} }
// Implements Broadcast interface
func (broadcastMessage *BroadcastMessage) Message() []byte { func (broadcastMessage *BroadcastMessage) Message() []byte {
msg, err := json.Marshal(broadcastMessage) msg, err := json.Marshal(broadcastMessage)
@@ -42,6 +45,7 @@ func (broadcastMessage *BroadcastMessage) Message() []byte {
return msg return msg
} }
// Implements Broadcast interface
func (broadcastMessage *BroadcastMessage) Finished() { func (broadcastMessage *BroadcastMessage) Finished() {
// No-Op // No-Op
} }
@@ -83,45 +87,83 @@ func (server *Server) MemberListInit() {
log.Fatal(err) log.Fatal(err)
} }
// Broadcast message to join raft cluster go server.broadcastRaftAddress()
msg := BroadcastMessage{
Action: "RaftJoin",
ServerID: server.config.ServerID,
ServerAddr: "Please let me join the raft cluster",
}
server.broadcastQueue.QueueBroadcast(&msg)
} }
} }
func (server *Server) broadcastRaftAddress() {
ticker := time.NewTicker(5 * time.Second)
for {
select {
case <-ticker.C:
msg := BroadcastMessage{
Action: "RaftJoin",
ServerID: server.config.ServerID,
ServerAddr: fmt.Sprintf("%s:%d", server.config.BindAddr, server.config.RaftBindPort),
}
server.broadcastQueue.QueueBroadcast(&msg)
case <-*server.raftJoinCh:
fmt.Println("Succesfully joined raft cluster.")
return
}
}
}
// Implements Delegate interface
func (server *Server) NodeMeta(limit int) []byte { func (server *Server) NodeMeta(limit int) []byte {
return []byte("") return []byte("")
} }
func (server *Server) NotifyMsg(msg []byte) { // Implements Delegate interface
fmt.Println(string(msg)) func (server *Server) NotifyMsg(msgBytes []byte) {
var msg BroadcastMessage
if err := json.Unmarshal(msgBytes, &msg); err != nil {
fmt.Print(err)
return
} }
switch msg.Action {
default:
fmt.Printf("No handler for action %s", msg.Action)
case "RaftJoin":
if server.isRaftLeader() {
fmt.Println("Asking to join the raft.")
}
case "MutateData":
// Mutate the value at a given key
case "FetchData":
// Fetch the value at a fiven key
}
}
// Implements Delegate interface
func (server *Server) GetBroadcasts(overhead, limit int) [][]byte { func (server *Server) GetBroadcasts(overhead, limit int) [][]byte {
return server.broadcastQueue.GetBroadcasts(overhead, limit) return server.broadcastQueue.GetBroadcasts(overhead, limit)
} }
// Implements Delegate interface
func (server *Server) LocalState(join bool) []byte { func (server *Server) LocalState(join bool) []byte {
// No-Op // No-Op
return []byte("") return []byte("")
} }
// Implements Delegate interface
func (server *Server) MergeRemoteState(buf []byte, join bool) { func (server *Server) MergeRemoteState(buf []byte, join bool) {
// No-Op // No-Op
} }
// Implements EventDelegate interface
func (server *Server) NotifyJoin(node *memberlist.Node) { func (server *Server) NotifyJoin(node *memberlist.Node) {
server.numOfNodes += 1 server.numOfNodes += 1
} }
// Implements EventDelegate interface
func (server *Server) NotifyLeave(node *memberlist.Node) { func (server *Server) NotifyLeave(node *memberlist.Node) {
server.numOfNodes -= 1 server.numOfNodes -= 1
} }
// Implements EventDelegate interface
func (server *Server) NotifyUpdate(node *memberlist.Node) { func (server *Server) NotifyUpdate(node *memberlist.Node) {
// No-Op // No-Op
} }

View File

@@ -110,6 +110,10 @@ func (server *Server) GetUint64(key []byte) (uint64, error) {
return 0, nil return 0, nil
} }
func (server *Server) isRaftLeader() bool {
return server.raft.State() == raft.Leader
}
func (server *Server) RaftShutdown() { func (server *Server) RaftShutdown() {
// Triggered before MemberListShutdown // Triggered before MemberListShutdown
// Leadership transfer if current node is the leader // Leadership transfer if current node is the leader