diff --git a/Dockerfile b/Dockerfile index 5524a5e..d2e7a01 100644 --- a/Dockerfile +++ b/Dockerfile @@ -26,4 +26,5 @@ CMD "./server" \ "--bootstrapCluster=${BOOTSTRAP_CLUSTER}" \ "--aclConfig=${ACL_CONFIG}" \ "--requirePass=${REQUIRE_PASS}" \ - "--password=${PASSWORD}" \ \ No newline at end of file + "--password=${PASSWORD}" \ + "--forwardCommand=${FORWARD_COMMAND}" \ \ No newline at end of file diff --git a/src/main.go b/src/main.go index 53990db..61f297e 100644 --- a/src/main.go +++ b/src/main.go @@ -254,11 +254,13 @@ func (server *Server) handleConnection(ctx context.Context, conn net.Conn) { connRW.Write(r.Response) connRW.Flush() - - // TODO: Add command to AOF + } else if server.config.ForwardCommand { + // Forward message to leader and return immediate OK response + server.memberList.ForwardDataMutation(ctx, message) + connRW.Write([]byte(utils.OK_RESPONSE)) + connRW.Flush() } else { - // TODO: Forward message to leader and wait for a response - connRW.Write([]byte("-Error not cluster leader, cannot carry out command\r\n\n")) + connRW.Write([]byte("-Error not cluster leader, cannot carry out command\r\n\r\n")) connRW.Flush() } } @@ -378,6 +380,7 @@ func (server *Server) Start(ctx context.Context) { HasJoinedCluster: server.raft.HasJoinedCluster, AddVoter: server.raft.AddVoter, RemoveRaftServer: server.raft.RemoveServer, + IsRaftLeader: server.raft.IsRaftLeader, }) server.raft.RaftInit(ctx) server.memberList.MemberListInit(ctx) diff --git a/src/memberlist_layer/broadcast.go b/src/memberlist_layer/broadcast.go index bc15377..35625d7 100644 --- a/src/memberlist_layer/broadcast.go +++ b/src/memberlist_layer/broadcast.go @@ -8,23 +8,27 @@ import ( type BroadcastMessage struct { NodeMeta - Action string `json:"Action"` - Content string `json:"Content"` + Action string `json:"Action"` + Content string `json:"Content"` + ContentHash string `json:"ContentHash"` } // Invalidates Implements Broadcast interface func (broadcastMessage *BroadcastMessage) Invalidates(other memberlist.Broadcast) bool { - mb, ok := other.(*BroadcastMessage) + otherBroadcast, ok := other.(*BroadcastMessage) if !ok { return false } - if mb.ServerID == broadcastMessage.ServerID && mb.Action == "RaftJoin" { - return true + switch broadcastMessage.Action { + case "RaftJoin": + return broadcastMessage.Action == otherBroadcast.Action && broadcastMessage.ServerID == otherBroadcast.ServerID + case "MutateData": + return broadcastMessage.Action == otherBroadcast.Action && broadcastMessage.ContentHash == otherBroadcast.ContentHash + default: + return false } - - return false } // Message Implements Broadcast interface diff --git a/src/memberlist_layer/delegate.go b/src/memberlist_layer/delegate.go index f9c9d8b..5211296 100644 --- a/src/memberlist_layer/delegate.go +++ b/src/memberlist_layer/delegate.go @@ -17,6 +17,7 @@ type DelegateOpts struct { config utils.Config broadcastQueue *memberlist.TransmitLimitedQueue addVoter func(id raft.ServerID, address raft.ServerAddress, prevIndex uint64, timeout time.Duration) error + isRaftLeader func() bool } func NewDelegate(opts DelegateOpts) *Delegate { @@ -54,11 +55,23 @@ func (delegate *Delegate) NotifyMsg(msgBytes []byte) { switch msg.Action { case "RaftJoin": - if err := delegate.options.addVoter(msg.NodeMeta.ServerID, msg.NodeMeta.RaftAddr, 0, 0); err != nil { + // If the current node is not the cluster leader, re-broadcast the message + if !delegate.options.isRaftLeader() { + delegate.options.broadcastQueue.QueueBroadcast(&msg) + return + } + err := delegate.options.addVoter(msg.NodeMeta.ServerID, msg.NodeMeta.RaftAddr, 0, 0) + if err != nil { fmt.Println(err) } case "MutateData": - // Mutate the value at a given key + // If the current node is not a cluster leader, re-broadcast the message + if !delegate.options.isRaftLeader() { + delegate.options.broadcastQueue.QueueBroadcast(&msg) + return + } + // Current node is the cluster leader, handle the mutation + fmt.Println(msg) } } diff --git a/src/memberlist_layer/event_delegate.go b/src/memberlist_layer/event_delegate.go index d96c0ce..ba9a3f3 100644 --- a/src/memberlist_layer/event_delegate.go +++ b/src/memberlist_layer/event_delegate.go @@ -11,9 +11,9 @@ type EventDelegate struct { } type EventDelegateOpts struct { - IncrementNodes func() - DecrementNodes func() - RemoveRaftServer func(meta NodeMeta) error + incrementNodes func() + decrementNodes func() + removeRaftServer func(meta NodeMeta) error } func NewEventDelegate(opts EventDelegateOpts) *EventDelegate { @@ -24,12 +24,12 @@ func NewEventDelegate(opts EventDelegateOpts) *EventDelegate { // NotifyJoin implements EventDelegate interface func (eventDelegate *EventDelegate) NotifyJoin(node *memberlist.Node) { - eventDelegate.options.IncrementNodes() + eventDelegate.options.incrementNodes() } // NotifyLeave implements EventDelegate interface func (eventDelegate *EventDelegate) NotifyLeave(node *memberlist.Node) { - eventDelegate.options.DecrementNodes() + eventDelegate.options.decrementNodes() var meta NodeMeta @@ -40,7 +40,7 @@ func (eventDelegate *EventDelegate) NotifyLeave(node *memberlist.Node) { return } - err = eventDelegate.options.RemoveRaftServer(meta) + err = eventDelegate.options.removeRaftServer(meta) if err != nil { fmt.Println(err) diff --git a/src/memberlist_layer/memberlist.go b/src/memberlist_layer/memberlist.go index 929c9b0..14206bd 100644 --- a/src/memberlist_layer/memberlist.go +++ b/src/memberlist_layer/memberlist.go @@ -23,6 +23,7 @@ type MemberlistOpts struct { HasJoinedCluster func() bool AddVoter func(id raft.ServerID, address raft.ServerAddress, prevIndex uint64, timeout time.Duration) error RemoveRaftServer func(meta NodeMeta) error + IsRaftLeader func() bool } type MemberList struct { @@ -48,11 +49,12 @@ func (m *MemberList) MemberListInit(ctx context.Context) { config: m.options.Config, broadcastQueue: m.broadcastQueue, addVoter: m.options.AddVoter, + isRaftLeader: m.options.IsRaftLeader, }) cfg.Events = NewEventDelegate(EventDelegateOpts{ - IncrementNodes: func() { m.numOfNodes += 1 }, - DecrementNodes: func() { m.numOfNodes -= 1 }, - RemoveRaftServer: m.options.RemoveRaftServer, + incrementNodes: func() { m.numOfNodes += 1 }, + decrementNodes: func() { m.numOfNodes -= 1 }, + removeRaftServer: m.options.RemoveRaftServer, }) m.broadcastQueue.RetransmitMult = 1 @@ -82,31 +84,29 @@ func (m *MemberList) MemberListInit(ctx context.Context) { log.Fatal(err) } - go m.broadcastRaftAddress(ctx) + m.broadcastRaftAddress(ctx) } } func (m *MemberList) broadcastRaftAddress(ctx context.Context) { - ticker := time.NewTicker(5 * time.Second) - - for { - msg := BroadcastMessage{ - Action: "RaftJoin", - NodeMeta: NodeMeta{ - ServerID: raft.ServerID(m.options.Config.ServerID), - RaftAddr: raft.ServerAddress(fmt.Sprintf("%s:%d", - m.options.Config.BindAddr, m.options.Config.RaftBindPort)), - }, - } - - if m.options.HasJoinedCluster() { - return - } - - m.broadcastQueue.QueueBroadcast(&msg) - - <-ticker.C + msg := BroadcastMessage{ + Action: "RaftJoin", + NodeMeta: NodeMeta{ + ServerID: raft.ServerID(m.options.Config.ServerID), + RaftAddr: raft.ServerAddress(fmt.Sprintf("%s:%d", + m.options.Config.BindAddr, m.options.Config.RaftBindPort)), + }, } + m.broadcastQueue.QueueBroadcast(&msg) +} + +func (m *MemberList) ForwardDataMutation(ctx context.Context, cmd string) { + // This function is only called by non-leaders + // It uses the broadcast queue to forward a data mutation within the cluster + m.broadcastQueue.QueueBroadcast(&BroadcastMessage{ + Action: "MutateData", + Content: cmd, + }) } func (m *MemberList) MemberListShutdown(ctx context.Context) { diff --git a/src/raft_layer/raft.go b/src/raft_layer/raft.go index 05dacb4..5b0e855 100644 --- a/src/raft_layer/raft.go +++ b/src/raft_layer/raft.go @@ -153,24 +153,23 @@ func (r *Raft) AddVoter( prevIndex uint64, timeout time.Duration, ) error { - if !r.IsRaftLeader() { - return errors.New("not leader, cannot add voter") - } - raftConfig := r.raft.GetConfiguration() - if err := raftConfig.Error(); err != nil { - return errors.New("could not retrieve raft config") - } - - for _, s := range raftConfig.Configuration().Servers { - // Check if a server already exists with the current attributes - if s.ID == id && s.Address == address { - return fmt.Errorf("server with id %s and address %s already exists", id, address) + if r.IsRaftLeader() { + raftConfig := r.raft.GetConfiguration() + if err := raftConfig.Error(); err != nil { + return errors.New("could not retrieve raft config") } - } - err := r.raft.AddVoter(id, address, prevIndex, timeout).Error() - if err != nil { - return err + for _, s := range raftConfig.Configuration().Servers { + // Check if a server already exists with the current attributes + if s.ID == id && s.Address == address { + return fmt.Errorf("server with id %s and address %s already exists", id, address) + } + } + + err := r.raft.AddVoter(id, address, prevIndex, timeout).Error() + if err != nil { + return err + } } return nil diff --git a/src/utils/config.go b/src/utils/config.go index c76238a..1e84f58 100644 --- a/src/utils/config.go +++ b/src/utils/config.go @@ -26,6 +26,7 @@ type Config struct { DataDir string `json:"dataDir" yaml:"dataDir"` BootstrapCluster bool `json:"BootstrapCluster" yaml:"bootstrapCluster"` AclConfig string `json:"AclConfig" yaml:"AclConfig"` + ForwardCommand bool `json:"forwardCommand" yaml:"forwardCommand"` RequirePass bool `json:"requirePass" yaml:"requirePass"` Password string `json:"password" yaml:"password"` } @@ -42,10 +43,14 @@ func GetConfig() (Config, error) { bindAddr := flag.String("bindAddr", "", "Address to bind the server to.") raftBindPort := flag.Int("raftPort", 7481, "Port to use for intra-cluster communication. Leave on the client.") mlBindPort := flag.Int("mlPort", 7946, "Port to use for memberlist communication.") - inMemory := flag.Bool("inMemory", false, "Whether to use memory or persisten storage for raft logs and snapshots.") + inMemory := flag.Bool("inMemory", false, "Whether to use memory or persistent storage for raft logs and snapshots.") dataDir := flag.String("dataDir", "/var/lib/memstore", "Directory to store raft snapshots and logs.") bootstrapCluster := flag.Bool("bootstrapCluster", false, "Whether this instance should bootstrap a new cluster.") aclConfig := flag.String("aclConfig", "", "ACL config file path.") + forwardCommand := flag.Bool( + "forwardCommand", + false, + "If the node is a follower, this flag forwards mutation command to the leader when set to true") requirePass := flag.Bool( "requirePass", false, @@ -82,6 +87,7 @@ It is a plain text value by default but you can provide a SHA256 hash by adding DataDir: *dataDir, BootstrapCluster: *bootstrapCluster, AclConfig: *aclConfig, + ForwardCommand: *forwardCommand, RequirePass: *requirePass, Password: *password, }