From 094d44c9a08ac3a0d98be3d4adfc867d50274fc7 Mon Sep 17 00:00:00 2001 From: Kelvin Clement Mwinuka Date: Sun, 30 Jul 2023 20:31:56 +0800 Subject: [PATCH] Repetetive broadcast of raft join request message until a join signal is received on a dedicated channel. Handle multiple types of Actions in NotifyMsg. Added function to check if the current server is the raft leader. --- server/main.go | 11 +++++--- server/memberlist.go | 60 +++++++++++++++++++++++++++++++++++++------- server/raft.go | 4 +++ 3 files changed, 63 insertions(+), 12 deletions(-) diff --git a/server/main.go b/server/main.go index 080ca88..3109011 100644 --- a/server/main.go +++ b/server/main.go @@ -34,7 +34,8 @@ type Server struct { broadcastQueue *memberlist.TransmitLimitedQueue numOfNodes int - cancelCh *chan (os.Signal) + cancelCh *chan (os.Signal) + raftJoinCh *chan (struct{}) } func (server *Server) Lock() { @@ -201,9 +202,10 @@ func main() { cancelCh := make(chan (os.Signal), 1) signal.Notify(cancelCh, syscall.SIGINT, syscall.SIGTERM) + raftJoinCh := make(chan struct{}) + server := &Server{ - cancelCh: &cancelCh, - config: config, + config: config, broadcastQueue: new(memberlist.TransmitLimitedQueue), numOfNodes: 0, @@ -213,6 +215,9 @@ func main() { NewSetGetCommand(), NewListCommand(), }, + + cancelCh: &cancelCh, + raftJoinCh: &raftJoinCh, } go server.Start() diff --git a/server/memberlist.go b/server/memberlist.go index bb26446..b3c4f14 100644 --- a/server/memberlist.go +++ b/server/memberlist.go @@ -15,8 +15,10 @@ type BroadcastMessage struct { Action string `json:"Action"` ServerID string `json:"ServerID"` ServerAddr string `json:"ServerAddr"` + Content string `json:"Content"` } +// Implements Broadcast interface func (broadcastMessage *BroadcastMessage) Invalidates(other memberlist.Broadcast) bool { mb, ok := other.(*BroadcastMessage) @@ -31,6 +33,7 @@ func (broadcastMessage *BroadcastMessage) Invalidates(other memberlist.Broadcast return false } +// Implements Broadcast interface func (broadcastMessage *BroadcastMessage) Message() []byte { msg, err := json.Marshal(broadcastMessage) @@ -42,6 +45,7 @@ func (broadcastMessage *BroadcastMessage) Message() []byte { return msg } +// Implements Broadcast interface func (broadcastMessage *BroadcastMessage) Finished() { // No-Op } @@ -83,45 +87,83 @@ func (server *Server) MemberListInit() { log.Fatal(err) } - // Broadcast message to join raft cluster - msg := BroadcastMessage{ - Action: "RaftJoin", - ServerID: server.config.ServerID, - ServerAddr: "Please let me join the raft cluster", - } - server.broadcastQueue.QueueBroadcast(&msg) + go server.broadcastRaftAddress() } } +func (server *Server) broadcastRaftAddress() { + ticker := time.NewTicker(5 * time.Second) + for { + select { + case <-ticker.C: + msg := BroadcastMessage{ + Action: "RaftJoin", + ServerID: server.config.ServerID, + ServerAddr: fmt.Sprintf("%s:%d", server.config.BindAddr, server.config.RaftBindPort), + } + server.broadcastQueue.QueueBroadcast(&msg) + case <-*server.raftJoinCh: + fmt.Println("Succesfully joined raft cluster.") + return + } + } +} + +// Implements Delegate interface func (server *Server) NodeMeta(limit int) []byte { return []byte("") } -func (server *Server) NotifyMsg(msg []byte) { - fmt.Println(string(msg)) +// Implements Delegate interface +func (server *Server) NotifyMsg(msgBytes []byte) { + var msg BroadcastMessage + + if err := json.Unmarshal(msgBytes, &msg); err != nil { + fmt.Print(err) + return + } + + switch msg.Action { + default: + fmt.Printf("No handler for action %s", msg.Action) + case "RaftJoin": + if server.isRaftLeader() { + fmt.Println("Asking to join the raft.") + } + case "MutateData": + // Mutate the value at a given key + case "FetchData": + // Fetch the value at a fiven key + } } +// Implements Delegate interface func (server *Server) GetBroadcasts(overhead, limit int) [][]byte { return server.broadcastQueue.GetBroadcasts(overhead, limit) } +// Implements Delegate interface func (server *Server) LocalState(join bool) []byte { // No-Op return []byte("") } +// Implements Delegate interface func (server *Server) MergeRemoteState(buf []byte, join bool) { // No-Op } +// Implements EventDelegate interface func (server *Server) NotifyJoin(node *memberlist.Node) { server.numOfNodes += 1 } +// Implements EventDelegate interface func (server *Server) NotifyLeave(node *memberlist.Node) { server.numOfNodes -= 1 } +// Implements EventDelegate interface func (server *Server) NotifyUpdate(node *memberlist.Node) { // No-Op } diff --git a/server/raft.go b/server/raft.go index 7472def..96175d2 100644 --- a/server/raft.go +++ b/server/raft.go @@ -110,6 +110,10 @@ func (server *Server) GetUint64(key []byte) (uint64, error) { return 0, nil } +func (server *Server) isRaftLeader() bool { + return server.raft.State() == raft.Leader +} + func (server *Server) RaftShutdown() { // Triggered before MemberListShutdown // Leadership transfer if current node is the leader