diff --git a/cluster/cluster.go b/cluster/cluster.go index d9bf73a..9ff4a20 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -2,234 +2,38 @@ package cluster import ( - "fmt" - "runtime/debug" - "strings" - - "github.com/hdt3213/rdb/core" - - "github.com/hdt3213/godis/config" - database2 "github.com/hdt3213/godis/database" - "github.com/hdt3213/godis/datastruct/dict" - "github.com/hdt3213/godis/datastruct/set" - "github.com/hdt3213/godis/interface/database" - "github.com/hdt3213/godis/interface/redis" - "github.com/hdt3213/godis/lib/idgenerator" - "github.com/hdt3213/godis/lib/logger" - "github.com/hdt3213/godis/redis/parser" - "github.com/hdt3213/godis/redis/protocol" "os" "path" - "sync" + + _ "github.com/hdt3213/godis/cluster/commands" // register nodes + "github.com/hdt3213/godis/cluster/core" + "github.com/hdt3213/godis/cluster/raft" + "github.com/hdt3213/godis/config" + "github.com/hdt3213/godis/lib/logger" ) -// Cluster represents a node of godis cluster -// it holds part of data and coordinates other nodes to finish transactions -type Cluster struct { - self string - addr string - db database.DBEngine - transactions *dict.SimpleDict // id -> Transaction - transactionMu sync.RWMutex - topology topology - slotMu sync.RWMutex - slots map[uint32]*hostSlot - idGenerator *idgenerator.IDGenerator - - clientFactory clientFactory -} - -type peerClient interface { - Send(args [][]byte) redis.Reply -} - -type peerStream interface { - Stream() <-chan *parser.Payload - Close() error -} - -type clientFactory interface { - GetPeerClient(peerAddr string) (peerClient, error) - ReturnPeerClient(peerAddr string, peerClient peerClient) error - NewStream(peerAddr string, cmdLine CmdLine) (peerStream, error) - Close() error -} - -const ( - slotStateHost = iota - slotStateImporting - slotStateMovingOut -) - -// hostSlot stores status of host which hosted by current node -type hostSlot struct { - state uint32 - mu sync.RWMutex - // OldNodeID is the node which is moving out this slot - // only valid during slot is importing - oldNodeID string - // OldNodeID is the node which is importing this slot - // only valid during slot is moving out - newNodeID string - - /* importedKeys stores imported keys during migrating progress - * While this slot is migrating, if importedKeys does not have the given key, then current node will import key before execute commands - * - * In a migrating slot, the slot on the old node is immutable, we only delete a key in the new node. - * Therefore, we must distinguish between non-migrated key and deleted key. - * Even if a key has been deleted, it still exists in importedKeys, so we can distinguish between non-migrated and deleted. - */ - importedKeys *set.Set - // keys stores all keys in this slot - // Cluster.makeInsertCallback and Cluster.makeDeleteCallback will keep keys up to time - keys *set.Set -} - -// if only one node involved in a transaction, just execute the command don't apply tcc procedure -var allowFastTransaction = true +type Cluster = core.Cluster // MakeCluster creates and starts a node of cluster func MakeCluster() *Cluster { - cluster := &Cluster{ - self: config.Properties.Self, - addr: config.Properties.AnnounceAddress(), - db: database2.NewStandaloneServer(), - transactions: dict.MakeSimple(), - idGenerator: idgenerator.MakeGenerator(config.Properties.Self), - clientFactory: newDefaultClientFactory(), - } - topologyPersistFile := path.Join(config.Properties.Dir, config.Properties.ClusterConfigFile) - cluster.topology = newRaft(cluster, topologyPersistFile) - cluster.db.SetKeyInsertedCallback(cluster.makeInsertCallback()) - cluster.db.SetKeyDeletedCallback(cluster.makeDeleteCallback()) - cluster.slots = make(map[uint32]*hostSlot) - var err error - if topologyPersistFile != "" && fileExists(topologyPersistFile) { - err = cluster.LoadConfig() - } else if config.Properties.ClusterAsSeed { - err = cluster.startAsSeed(config.Properties.AnnounceAddress()) - } else { - err = cluster.Join(config.Properties.ClusterSeed) - } + raftPath := path.Join(config.Properties.Dir, "raft") + err := os.MkdirAll(raftPath, os.ModePerm) if err != nil { panic(err) } + cluster, err := core.NewCluster(&core.Config{ + RaftConfig: raft.RaftConfig{ + RedisAdvertiseAddr: config.Properties.AnnounceAddress(), + RaftListenAddr: config.Properties.RaftListenAddr, + RaftAdvertiseAddr: config.Properties.RaftAdvertiseAddr, + Dir: raftPath, + }, + StartAsSeed: config.Properties.ClusterAsSeed, + JoinAddress: config.Properties.ClusterSeed, + }) + if err != nil { + logger.Error(err.Error()) + panic(err) + } return cluster } - -// CmdFunc represents the handler of a redis command -type CmdFunc func(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply - -// Close stops current node of cluster -func (cluster *Cluster) Close() { - _ = cluster.topology.Close() - cluster.db.Close() - cluster.clientFactory.Close() -} - -func isAuthenticated(c redis.Connection) bool { - if config.Properties.RequirePass == "" { - return true - } - return c.GetPassword() == config.Properties.RequirePass -} - -// Exec executes command on cluster -func (cluster *Cluster) Exec(c redis.Connection, cmdLine [][]byte) (result redis.Reply) { - defer func() { - if err := recover(); err != nil { - logger.Warn(fmt.Sprintf("error occurs: %v\n%s", err, string(debug.Stack()))) - result = &protocol.UnknownErrReply{} - } - }() - cmdName := strings.ToLower(string(cmdLine[0])) - if cmdName == "info" { - if ser, ok := cluster.db.(*database2.Server); ok { - return database2.Info(ser, cmdLine[1:]) - } - } - if cmdName == "auth" { - return database2.Auth(c, cmdLine[1:]) - } - if !isAuthenticated(c) { - return protocol.MakeErrReply("NOAUTH Authentication required") - } - - if cmdName == "dbsize" { - if ser, ok := cluster.db.(*database2.Server); ok { - return database2.DbSize(c, ser) - } - } - - if cmdName == "multi" { - if len(cmdLine) != 1 { - return protocol.MakeArgNumErrReply(cmdName) - } - return database2.StartMulti(c) - } else if cmdName == "discard" { - if len(cmdLine) != 1 { - return protocol.MakeArgNumErrReply(cmdName) - } - return database2.DiscardMulti(c) - } else if cmdName == "exec" { - if len(cmdLine) != 1 { - return protocol.MakeArgNumErrReply(cmdName) - } - return execMulti(cluster, c, nil) - } else if cmdName == "select" { - return protocol.MakeErrReply("select not supported in cluster") - } - if c != nil && c.InMultiState() { - return database2.EnqueueCmd(c, cmdLine) - } - cmdFunc, ok := router[cmdName] - if !ok { - return protocol.MakeErrReply("ERR unknown command '" + cmdName + "', or not supported in cluster mode") - } - result = cmdFunc(cluster, c, cmdLine) - return -} - -// AfterClientClose does some clean after client close connection -func (cluster *Cluster) AfterClientClose(c redis.Connection) { - cluster.db.AfterClientClose(c) -} - -func (cluster *Cluster) LoadRDB(dec *core.Decoder) error { - return cluster.db.LoadRDB(dec) -} - -func (cluster *Cluster) makeInsertCallback() database.KeyEventCallback { - return func(dbIndex int, key string, entity *database.DataEntity) { - slotId := getSlot(key) - cluster.slotMu.RLock() - slot, ok := cluster.slots[slotId] - cluster.slotMu.RUnlock() - // As long as the command is executed, we should update slot.keys regardless of slot.state - if ok { - slot.mu.Lock() - defer slot.mu.Unlock() - slot.keys.Add(key) - } - } -} - -func (cluster *Cluster) makeDeleteCallback() database.KeyEventCallback { - return func(dbIndex int, key string, entity *database.DataEntity) { - slotId := getSlot(key) - cluster.slotMu.RLock() - slot, ok := cluster.slots[slotId] - cluster.slotMu.RUnlock() - // As long as the command is executed, we should update slot.keys regardless of slot.state - if ok { - slot.mu.Lock() - defer slot.mu.Unlock() - slot.keys.Remove(key) - } - } -} - -func fileExists(filename string) bool { - info, err := os.Stat(filename) - return err == nil && !info.IsDir() -} diff --git a/cluster/com.go b/cluster/com.go deleted file mode 100644 index c667d81..0000000 --- a/cluster/com.go +++ /dev/null @@ -1,86 +0,0 @@ -package cluster - -import ( - "github.com/hdt3213/godis/interface/redis" - "github.com/hdt3213/godis/lib/utils" - "github.com/hdt3213/godis/redis/connection" - "github.com/hdt3213/godis/redis/protocol" -) - -// relay function relays command to peer or calls cluster.Exec -func (cluster *Cluster) relay(peerId string, c redis.Connection, cmdLine [][]byte) redis.Reply { - // use a variable to allow injecting stub for testing, see defaultRelayImpl - if peerId == cluster.self { - // to self db - return cluster.Exec(c, cmdLine) - } - // peerId is peer.Addr - cli, err := cluster.clientFactory.GetPeerClient(peerId) - if err != nil { - return protocol.MakeErrReply(err.Error()) - } - defer func() { - _ = cluster.clientFactory.ReturnPeerClient(peerId, cli) - }() - return cli.Send(cmdLine) -} - -// relayByKey function relays command to peer -// use routeKey to determine peer node -func (cluster *Cluster) relayByKey(routeKey string, c redis.Connection, args [][]byte) redis.Reply { - slotId := getSlot(routeKey) - peer := cluster.pickNode(slotId) - return cluster.relay(peer.ID, c, args) -} - -// broadcast function broadcasts command to all node in cluster -func (cluster *Cluster) broadcast(c redis.Connection, args [][]byte) map[string]redis.Reply { - result := make(map[string]redis.Reply) - for _, node := range cluster.topology.GetNodes() { - reply := cluster.relay(node.ID, c, args) - result[node.Addr] = reply - } - return result -} - -// ensureKey will migrate key to current node if the key is in a slot migrating to current node -// invoker should provide with locks of key -func (cluster *Cluster) ensureKey(key string) protocol.ErrorReply { - slotId := getSlot(key) - cluster.slotMu.RLock() - slot := cluster.slots[slotId] - cluster.slotMu.RUnlock() - if slot == nil { - return nil - } - if slot.state != slotStateImporting || slot.importedKeys.Has(key) { - return nil - } - resp := cluster.relay(slot.oldNodeID, connection.NewFakeConn(), utils.ToCmdLine("DumpKey_", key)) - if protocol.IsErrorReply(resp) { - return resp.(protocol.ErrorReply) - } - if protocol.IsEmptyMultiBulkReply(resp) { - slot.importedKeys.Add(key) - return nil - } - dumpResp := resp.(*protocol.MultiBulkReply) - if len(dumpResp.Args) != 2 { - return protocol.MakeErrReply("illegal dump key response") - } - // reuse copy to command ^_^ - resp = cluster.db.Exec(connection.NewFakeConn(), [][]byte{ - []byte("CopyTo"), []byte(key), dumpResp.Args[0], dumpResp.Args[1], - }) - if protocol.IsErrorReply(resp) { - return resp.(protocol.ErrorReply) - } - slot.importedKeys.Add(key) - return nil -} - -func (cluster *Cluster) ensureKeyWithoutLock(key string) protocol.ErrorReply { - cluster.db.RWLocks(0, []string{key}, nil) - defer cluster.db.RWUnLocks(0, []string{key}, nil) - return cluster.ensureKey(key) -} diff --git a/cluster/com_factory.go b/cluster/com_factory.go deleted file mode 100644 index c9ab8ae..0000000 --- a/cluster/com_factory.go +++ /dev/null @@ -1,142 +0,0 @@ -package cluster - -import ( - "errors" - "fmt" - "github.com/hdt3213/godis/config" - "github.com/hdt3213/godis/datastruct/dict" - "github.com/hdt3213/godis/interface/redis" - "github.com/hdt3213/godis/lib/logger" - "github.com/hdt3213/godis/lib/pool" - "github.com/hdt3213/godis/lib/utils" - "github.com/hdt3213/godis/redis/client" - "github.com/hdt3213/godis/redis/parser" - "github.com/hdt3213/godis/redis/protocol" - "net" -) - -type defaultClientFactory struct { - nodeConnections dict.Dict // map[string]*pool.Pool -} - -var connectionPoolConfig = pool.Config{ - MaxIdle: 1, - MaxActive: 16, -} - -// GetPeerClient gets a client with peer form pool -func (factory *defaultClientFactory) GetPeerClient(peerAddr string) (peerClient, error) { - var connectionPool *pool.Pool - raw, ok := factory.nodeConnections.Get(peerAddr) - if !ok { - creator := func() (interface{}, error) { - c, err := client.MakeClient(peerAddr) - if err != nil { - return nil, err - } - c.Start() - // all peers of cluster should use the same password - if config.Properties.RequirePass != "" { - authResp := c.Send(utils.ToCmdLine("AUTH", config.Properties.RequirePass)) - if !protocol.IsOKReply(authResp) { - return nil, fmt.Errorf("auth failed, resp: %s", string(authResp.ToBytes())) - } - } - return c, nil - } - finalizer := func(x interface{}) { - logger.Debug("destroy client") - cli, ok := x.(client.Client) - if !ok { - return - } - cli.Close() - } - connectionPool = pool.New(creator, finalizer, connectionPoolConfig) - factory.nodeConnections.Put(peerAddr, connectionPool) - } else { - connectionPool = raw.(*pool.Pool) - } - raw, err := connectionPool.Get() - if err != nil { - return nil, err - } - conn, ok := raw.(*client.Client) - if !ok { - return nil, errors.New("connection pool make wrong type") - } - return conn, nil -} - -// ReturnPeerClient returns client to pool -func (factory *defaultClientFactory) ReturnPeerClient(peer string, peerClient peerClient) error { - raw, ok := factory.nodeConnections.Get(peer) - if !ok { - return errors.New("connection pool not found") - } - raw.(*pool.Pool).Put(peerClient) - return nil -} - -type tcpStream struct { - conn net.Conn - ch <-chan *parser.Payload -} - -func (s *tcpStream) Stream() <-chan *parser.Payload { - return s.ch -} - -func (s *tcpStream) Close() error { - return s.conn.Close() -} - -func (factory *defaultClientFactory) NewStream(peerAddr string, cmdLine CmdLine) (peerStream, error) { - // todo: reuse connection - conn, err := net.Dial("tcp", peerAddr) - if err != nil { - return nil, fmt.Errorf("connect with %s failed: %v", peerAddr, err) - } - ch := parser.ParseStream(conn) - send2node := func(cmdLine CmdLine) redis.Reply { - req := protocol.MakeMultiBulkReply(cmdLine) - _, err := conn.Write(req.ToBytes()) - if err != nil { - return protocol.MakeErrReply(err.Error()) - } - resp := <-ch - if resp.Err != nil { - return protocol.MakeErrReply(resp.Err.Error()) - } - return resp.Data - } - if config.Properties.RequirePass != "" { - authResp := send2node(utils.ToCmdLine("AUTH", config.Properties.RequirePass)) - if !protocol.IsOKReply(authResp) { - return nil, fmt.Errorf("auth failed, resp: %s", string(authResp.ToBytes())) - } - } - req := protocol.MakeMultiBulkReply(cmdLine) - _, err = conn.Write(req.ToBytes()) - if err != nil { - return nil, protocol.MakeErrReply("send cmdLine failed: " + err.Error()) - } - return &tcpStream{ - conn: conn, - ch: ch, - }, nil -} - -func newDefaultClientFactory() *defaultClientFactory { - return &defaultClientFactory{ - nodeConnections: dict.MakeConcurrent(1), - } -} - -func (factory *defaultClientFactory) Close() error { - factory.nodeConnections.ForEach(func(key string, val interface{}) bool { - val.(*pool.Pool).Close() - return true - }) - return nil -} diff --git a/cluster/com_test.go b/cluster/com_test.go deleted file mode 100644 index ecd9b76..0000000 --- a/cluster/com_test.go +++ /dev/null @@ -1,58 +0,0 @@ -package cluster - -import ( - "github.com/hdt3213/godis/config" - "github.com/hdt3213/godis/lib/utils" - "github.com/hdt3213/godis/redis/connection" - "github.com/hdt3213/godis/redis/protocol/asserts" - "testing" -) - -func TestExec(t *testing.T) { - testCluster := testCluster[0] - conn := connection.NewFakeConn() - for i := 0; i < 1000; i++ { - key := RandString(4) - value := RandString(4) - testCluster.Exec(conn, toArgs("SET", key, value)) - ret := testCluster.Exec(conn, toArgs("GET", key)) - asserts.AssertBulkReply(t, ret, value) - } -} - -func TestAuth(t *testing.T) { - passwd := utils.RandString(10) - config.Properties.RequirePass = passwd - defer func() { - config.Properties.RequirePass = "" - }() - conn := connection.NewFakeConn() - testCluster := testCluster[0] - ret := testCluster.Exec(conn, toArgs("GET", "a")) - asserts.AssertErrReply(t, ret, "NOAUTH Authentication required") - ret = testCluster.Exec(conn, toArgs("AUTH", passwd)) - asserts.AssertStatusReply(t, ret, "OK") - ret = testCluster.Exec(conn, toArgs("GET", "a")) - asserts.AssertNotError(t, ret) -} - -func TestRelay(t *testing.T) { - testNodeA := testCluster[1] - key := RandString(4) - value := RandString(4) - conn := connection.NewFakeConn() - ret := testNodeA.relay(addresses[1], conn, toArgs("SET", key, value)) - asserts.AssertNotError(t, ret) - ret = testNodeA.relay(addresses[1], conn, toArgs("GET", key)) - asserts.AssertBulkReply(t, ret, value) -} - -func TestBroadcast(t *testing.T) { - testCluster2 := testCluster[0] - key := RandString(4) - value := RandString(4) - rets := testCluster2.broadcast(connection.NewFakeConn(), toArgs("SET", key, value)) - for _, v := range rets { - asserts.AssertNotError(t, v) - } -} diff --git a/cluster/commands/default.go b/cluster/commands/default.go new file mode 100644 index 0000000..38ba402 --- /dev/null +++ b/cluster/commands/default.go @@ -0,0 +1,95 @@ +package commands + +import "github.com/hdt3213/godis/cluster/core" + +func RegisterCommands() { + defaultCmds := []string{ + "expire", + "expireAt", + "pExpire", + "pExpireAt", + "ttl", + "PTtl", + "persist", + "exists", + "type", + "set", + "setNx", + "setEx", + "pSetEx", + "get", + "getEx", + "getSet", + "getDel", + "incr", + "incrBy", + "incrByFloat", + "decr", + "decrBy", + "lPush", + "lPushX", + "rPush", + "rPushX", + "LPop", + "RPop", + "LRem", + "LLen", + "LIndex", + "LSet", + "LRange", + "HSet", + "HSetNx", + "HGet", + "HExists", + "HDel", + "HLen", + "HStrLen", + "HMGet", + "HMSet", + "HKeys", + "HVals", + "HGetAll", + "HIncrBy", + "HIncrByFloat", + "HRandField", + "SAdd", + "SIsMember", + "SRem", + "SPop", + "SCard", + "SMembers", + "SInter", + "SInterStore", + "SUnion", + "SUnionStore", + "SDiff", + "SDiffStore", + "SRandMember", + "ZAdd", + "ZScore", + "ZIncrBy", + "ZRank", + "ZCount", + "ZRevRank", + "ZCard", + "ZRange", + "ZRevRange", + "ZRangeByScore", + "ZRevRangeByScore", + "ZRem", + "ZRemRangeByScore", + "ZRemRangeByRank", + "GeoAdd", + "GeoPos", + "GeoDist", + "GeoHash", + "GeoRadius", + "GeoRadiusByMember", + "GetVer", + "DumpKey", + } + for _, name := range defaultCmds { + core.RegisterDefaultCmd(name) + } + +} diff --git a/cluster/copy.go b/cluster/copy.go deleted file mode 100644 index f0cb960..0000000 --- a/cluster/copy.go +++ /dev/null @@ -1,119 +0,0 @@ -package cluster - -import ( - "github.com/hdt3213/godis/interface/redis" - "github.com/hdt3213/godis/lib/utils" - "github.com/hdt3213/godis/redis/protocol" - "strconv" - "strings" -) - -const copyToAnotherDBErr = "ERR Copying to another database is not allowed in cluster mode" -const noReplace = "NoReplace" -const useReplace = "UseReplace" - -// Copy copies the value stored at the source key to the destination key. -// the origin and the destination must within the same node. -func Copy(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { - if len(args) < 3 { - return protocol.MakeErrReply("ERR wrong number of arguments for 'copy' command") - } - srcKey := string(args[1]) - destKey := string(args[2]) - srcNode := cluster.pickNodeAddrByKey(srcKey) - destNode := cluster.pickNodeAddrByKey(destKey) - replaceFlag := noReplace - if len(args) > 3 { - for i := 3; i < len(args); i++ { - arg := strings.ToLower(string(args[i])) - if arg == "db" { - return protocol.MakeErrReply(copyToAnotherDBErr) - } else if arg == "replace" { - replaceFlag = useReplace - } else { - return protocol.MakeSyntaxErrReply() - } - } - } - - if srcNode == destNode { - args[0] = []byte("Copy_") // Copy_ will go directly to cluster.DB avoiding infinite recursion - return cluster.relay(srcNode, c, args) - } - groupMap := map[string][]string{ - srcNode: {srcKey}, - destNode: {destKey}, - } - - txID := cluster.idGenerator.NextID() - txIDStr := strconv.FormatInt(txID, 10) - // prepare Copy from - srcPrepareResp := cluster.relay(srcNode, c, makeArgs("Prepare", txIDStr, "CopyFrom", srcKey)) - if protocol.IsErrorReply(srcPrepareResp) { - // rollback src node - requestRollback(cluster, c, txID, map[string][]string{srcNode: {srcKey}}) - return srcPrepareResp - } - srcPrepareMBR, ok := srcPrepareResp.(*protocol.MultiBulkReply) - if !ok || len(srcPrepareMBR.Args) < 2 { - requestRollback(cluster, c, txID, map[string][]string{srcNode: {srcKey}}) - return protocol.MakeErrReply("ERR invalid prepare response") - } - // prepare Copy to - destPrepareResp := cluster.relay(destNode, c, utils.ToCmdLine3("Prepare", []byte(txIDStr), - []byte("CopyTo"), []byte(destKey), srcPrepareMBR.Args[0], srcPrepareMBR.Args[1], []byte(replaceFlag))) - if destErr, ok := destPrepareResp.(protocol.ErrorReply); ok { - // rollback src node - requestRollback(cluster, c, txID, groupMap) - if destErr.Error() == keyExistsErr { - return protocol.MakeIntReply(0) - } - return destPrepareResp - } - if _, errReply := requestCommit(cluster, c, txID, groupMap); errReply != nil { - requestRollback(cluster, c, txID, groupMap) - return errReply - } - return protocol.MakeIntReply(1) -} - -// prepareCopyFrom is prepare-function for CopyFrom, see prepareFuncMap -func prepareCopyFrom(cluster *Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply { - if len(cmdLine) != 2 { - return protocol.MakeArgNumErrReply("CopyFrom") - } - key := string(cmdLine[1]) - existResp := cluster.db.ExecWithLock(conn, utils.ToCmdLine("Exists", key)) - if protocol.IsErrorReply(existResp) { - return existResp - } - existIntResp := existResp.(*protocol.IntReply) - if existIntResp.Code == 0 { - return protocol.MakeErrReply("ERR no such key") - } - return cluster.db.ExecWithLock(conn, utils.ToCmdLine2("DumpKey", key)) -} - -func prepareCopyTo(cluster *Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply { - if len(cmdLine) != 5 { - return protocol.MakeArgNumErrReply("CopyTo") - } - key := string(cmdLine[1]) - replaceFlag := string(cmdLine[4]) - existResp := cluster.db.ExecWithLock(conn, utils.ToCmdLine("Exists", key)) - if protocol.IsErrorReply(existResp) { - return existResp - } - existIntResp := existResp.(*protocol.IntReply) - if existIntResp.Code == 1 { - if replaceFlag == noReplace { - return protocol.MakeErrReply(keyExistsErr) - } - } - return protocol.MakeOkReply() -} - -func init() { - registerPrepareFunc("CopyFrom", prepareCopyFrom) - registerPrepareFunc("CopyTo", prepareCopyTo) -} diff --git a/cluster/copy_test.go b/cluster/copy_test.go deleted file mode 100644 index f0f343a..0000000 --- a/cluster/copy_test.go +++ /dev/null @@ -1,128 +0,0 @@ -package cluster - -import ( - "github.com/hdt3213/godis/lib/utils" - "github.com/hdt3213/godis/redis/connection" - "github.com/hdt3213/godis/redis/protocol/asserts" - "testing" -) - -func TestCopy(t *testing.T) { - conn := new(connection.FakeConn) - testNodeA := testCluster[0] - testNodeB := testCluster[1] - testNodeA.Exec(conn, utils.ToCmdLine("FlushALL")) - testNodeB.Exec(conn, utils.ToCmdLine("FlushALL")) - - // cross node copy - srcKey := "127.0.0.1:6399Bk2r3Sz0V5" // use fix key to ensure hashing to different node - destKey := "127.0.0.1:7379CcdC0QOopF" - value := utils.RandString(10) - testNodeA.Exec(conn, utils.ToCmdLine("SET", srcKey, value)) - result := Copy(testNodeA, conn, utils.ToCmdLine("COPY", srcKey, destKey)) - asserts.AssertIntReply(t, result, 1) - result = testNodeA.Exec(conn, utils.ToCmdLine("GET", srcKey)) - asserts.AssertBulkReply(t, result, value) - result = testNodeB.Exec(conn, utils.ToCmdLine("GET", destKey)) - asserts.AssertBulkReply(t, result, value) - // key exists - result = Copy(testNodeA, conn, utils.ToCmdLine("COPY", srcKey, destKey)) - asserts.AssertIntReply(t, result, 0) - // replace - value = utils.RandString(10) - testNodeA.Exec(conn, utils.ToCmdLine("SET", srcKey, value)) - result = Copy(testNodeA, conn, utils.ToCmdLine("COPY", srcKey, destKey, "REPLACE")) - asserts.AssertIntReply(t, result, 1) - result = testNodeA.Exec(conn, utils.ToCmdLine("GET", srcKey)) - asserts.AssertBulkReply(t, result, value) - result = testNodeB.Exec(conn, utils.ToCmdLine("GET", destKey)) - asserts.AssertBulkReply(t, result, value) - // test copy expire time - testNodeA.Exec(conn, utils.ToCmdLine("SET", srcKey, value, "EX", "1000")) - result = Copy(testNodeA, conn, utils.ToCmdLine("COPY", srcKey, destKey, "REPLACE")) - asserts.AssertIntReply(t, result, 1) - result = testNodeA.Exec(conn, utils.ToCmdLine("TTL", srcKey)) - asserts.AssertIntReplyGreaterThan(t, result, 0) - result = testNodeB.Exec(conn, utils.ToCmdLine("TTL", destKey)) - asserts.AssertIntReplyGreaterThan(t, result, 0) - - // same node copy - srcKey = "{" + testNodeA.self + "}" + utils.RandString(10) - destKey = "{" + testNodeA.self + "}" + utils.RandString(9) - value = utils.RandString(10) - testNodeA.Exec(conn, utils.ToCmdLine("SET", srcKey, value)) - result = Copy(testNodeA, conn, utils.ToCmdLine("COPY", srcKey, destKey)) - asserts.AssertIntReply(t, result, 1) - result = testNodeA.Exec(conn, utils.ToCmdLine("GET", srcKey)) - asserts.AssertBulkReply(t, result, value) - result = testNodeA.Exec(conn, utils.ToCmdLine("GET", destKey)) - asserts.AssertBulkReply(t, result, value) - // key exists - result = Copy(testNodeA, conn, utils.ToCmdLine("COPY", srcKey, destKey)) - asserts.AssertIntReply(t, result, 0) - // replace - value = utils.RandString(10) - testNodeA.Exec(conn, utils.ToCmdLine("SET", srcKey, value)) - result = Copy(testNodeA, conn, utils.ToCmdLine("COPY", srcKey, destKey, "REPLACE")) - asserts.AssertIntReply(t, result, 1) - result = testNodeA.Exec(conn, utils.ToCmdLine("GET", srcKey)) - asserts.AssertBulkReply(t, result, value) - result = testNodeA.Exec(conn, utils.ToCmdLine("GET", destKey)) - asserts.AssertBulkReply(t, result, value) - // test copy expire time - testNodeA.Exec(conn, utils.ToCmdLine("SET", srcKey, value, "EX", "1000")) - result = Copy(testNodeA, conn, utils.ToCmdLine("COPY", srcKey, destKey, "REPLACE")) - asserts.AssertIntReply(t, result, 1) - result = testNodeA.Exec(conn, utils.ToCmdLine("TTL", srcKey)) - asserts.AssertIntReplyGreaterThan(t, result, 0) - result = testNodeA.Exec(conn, utils.ToCmdLine("TTL", destKey)) - asserts.AssertIntReplyGreaterThan(t, result, 0) -} - -func TestCopyTimeout(t *testing.T) { - conn := new(connection.FakeConn) - testNodeA := testCluster[0] - testNodeB := testCluster[1] - testNodeA.Exec(conn, utils.ToCmdLine("FlushALL")) - testNodeB.Exec(conn, utils.ToCmdLine("FlushALL")) - - // test src prepare failed - timeoutFlags[0] = true - srcKey := "127.0.0.1:6399Bk2r3Sz0V5" // use fix key to ensure hashing to different node - destKey := "127.0.0.1:7379CcdC0QOopF" - value := utils.RandString(10) - testNodeA.Exec(conn, utils.ToCmdLine("SET", srcKey, value, "ex", "1000")) - result := Rename(testNodeB, conn, utils.ToCmdLine("RENAME", srcKey, destKey)) - asserts.AssertErrReply(t, result, "ERR timeout") - result = testNodeA.Exec(conn, utils.ToCmdLine("EXISTS", srcKey)) - asserts.AssertIntReply(t, result, 1) - result = testNodeA.Exec(conn, utils.ToCmdLine("TTL", srcKey)) - asserts.AssertIntReplyGreaterThan(t, result, 0) - result = testNodeB.Exec(conn, utils.ToCmdLine("EXISTS", destKey)) - asserts.AssertIntReply(t, result, 0) - timeoutFlags[0] = false - - // test dest prepare failed - timeoutFlags[1] = true - value = utils.RandString(10) - testNodeA.Exec(conn, utils.ToCmdLine("SET", srcKey, value, "ex", "1000")) - result = Rename(testNodeA, conn, utils.ToCmdLine("COPY", srcKey, destKey)) - asserts.AssertErrReply(t, result, "ERR timeout") - result = testNodeA.Exec(conn, utils.ToCmdLine("EXISTS", srcKey)) - asserts.AssertIntReply(t, result, 1) - result = testNodeA.Exec(conn, utils.ToCmdLine("TTL", srcKey)) - asserts.AssertIntReplyGreaterThan(t, result, 0) - result = testNodeB.Exec(conn, utils.ToCmdLine("EXISTS", destKey)) - asserts.AssertIntReply(t, result, 0) - timeoutFlags[1] = false - // Copying to another database - srcKey = testNodeA.self + utils.RandString(10) - value = utils.RandString(10) - destKey = srcKey + utils.RandString(2) - testNodeA.Exec(conn, utils.ToCmdLine("SET", srcKey, value)) - result = Copy(testNodeA, conn, utils.ToCmdLine("COPY", srcKey, destKey, "db", "1")) - asserts.AssertErrReply(t, result, copyToAnotherDBErr) - - result = Copy(testNodeA, conn, utils.ToCmdLine("COPY", srcKey)) - asserts.AssertErrReply(t, result, "ERR wrong number of arguments for 'copy' command") -} diff --git a/cluster/core/core.go b/cluster/core/core.go index 0daa5d3..6e58ba6 100644 --- a/cluster/core/core.go +++ b/cluster/core/core.go @@ -7,6 +7,7 @@ import ( dbimpl "github.com/hdt3213/godis/database" "github.com/hdt3213/godis/datastruct/set" "github.com/hdt3213/godis/interface/database" + "github.com/hdt3213/godis/interface/redis" "github.com/hdt3213/godis/lib/utils" "github.com/hdt3213/godis/redis/protocol" ) @@ -130,3 +131,16 @@ func NewCluster(cfg *Config) (*Cluster, error) { cluster.injectDeleteCallback() return cluster, nil } + +// AfterClientClose does some clean after client close connection +func (cluster *Cluster) AfterClientClose(c redis.Connection) { + +} + +func (cluster *Cluster) Close() { + cluster.db.Close() + err := cluster.raftNode.Close() + if err != nil { + panic(err) + } +} \ No newline at end of file diff --git a/cluster/core/core_test.go b/cluster/core/core_test.go index 0b4b8e6..2a9bfbb 100644 --- a/cluster/core/core_test.go +++ b/cluster/core/core_test.go @@ -6,6 +6,9 @@ import ( "time" "github.com/hdt3213/godis/cluster/raft" + "github.com/hdt3213/godis/lib/utils" + "github.com/hdt3213/godis/redis/connection" + "github.com/hdt3213/godis/redis/protocol" ) func TestClusterBootstrap(t *testing.T) { @@ -16,6 +19,9 @@ func TestClusterBootstrap(t *testing.T) { defer func() { os.RemoveAll(leaderDir) }() + RegisterDefaultCmd("get") + RegisterDefaultCmd("set") + // connection stub connections := NewInMemConnectionFactory() leaderCfg := &Config{ @@ -35,6 +41,20 @@ func TestClusterBootstrap(t *testing.T) { } connections.nodes[leaderCfg.RedisAdvertiseAddr] = leader + // set key-values for test + testEntries := make(map[string]string) + c := connection.NewFakeConn() + for i := 0; i < 1000; i++ { + key := utils.RandString(10) + value := utils.RandString(10) + testEntries[key] = value + result := leader.Exec(c, utils.ToCmdLine("set", key, value)) + if !protocol.IsOKReply(result) { + t.Errorf("command [set] failed: %s", string(result.ToBytes())) + return + } + } + // start follower followerDir := "test/1" os.RemoveAll(followerDir) @@ -100,4 +120,15 @@ func TestClusterBootstrap(t *testing.T) { time.Sleep(time.Second) } } + + // set key-values for test + for key, value := range testEntries { + c := connection.NewFakeConn() + result := leader.Exec(c, utils.ToCmdLine("get", key)) + result2 := result.(*protocol.BulkReply) + if string(result2.Arg) != value { + t.Errorf("command [get] failed: %s", string(result.ToBytes())) + return + } + } } diff --git a/cluster/del.go b/cluster/del.go deleted file mode 100644 index cbe6087..0000000 --- a/cluster/del.go +++ /dev/null @@ -1,61 +0,0 @@ -package cluster - -import ( - "github.com/hdt3213/godis/interface/redis" - "github.com/hdt3213/godis/redis/protocol" - "strconv" -) - -// Del atomically removes given writeKeys from cluster, writeKeys can be distributed on any node -// if the given writeKeys are distributed on different node, Del will use try-commit-catch to remove them -func Del(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { - if len(args) < 2 { - return protocol.MakeErrReply("ERR wrong number of arguments for 'del' command") - } - keys := make([]string, len(args)-1) - for i := 1; i < len(args); i++ { - keys[i-1] = string(args[i]) - } - groupMap := cluster.groupBy(keys) - if len(groupMap) == 1 && allowFastTransaction { // do fast - for peer, group := range groupMap { // only one peerKeys - return cluster.relay(peer, c, makeArgs("Del_", group...)) - } - } - // prepare - var errReply redis.Reply - txID := cluster.idGenerator.NextID() - txIDStr := strconv.FormatInt(txID, 10) - rollback := false - for peer, peerKeys := range groupMap { - peerArgs := []string{txIDStr, "DEL"} - peerArgs = append(peerArgs, peerKeys...) - var resp redis.Reply - resp = cluster.relay(peer, c, makeArgs("Prepare", peerArgs...)) - if protocol.IsErrorReply(resp) { - errReply = resp - rollback = true - break - } - } - var respList []redis.Reply - if rollback { - // rollback - requestRollback(cluster, c, txID, groupMap) - } else { - // commit - respList, errReply = requestCommit(cluster, c, txID, groupMap) - if errReply != nil { - rollback = true - } - } - if !rollback { - var deleted int64 = 0 - for _, resp := range respList { - intResp := resp.(*protocol.IntReply) - deleted += intResp.Code - } - return protocol.MakeIntReply(int64(deleted)) - } - return errReply -} diff --git a/cluster/del_test.go b/cluster/del_test.go deleted file mode 100644 index 836f0f9..0000000 --- a/cluster/del_test.go +++ /dev/null @@ -1,18 +0,0 @@ -package cluster - -import ( - "github.com/hdt3213/godis/redis/connection" - "github.com/hdt3213/godis/redis/protocol/asserts" - "testing" -) - -func TestDel(t *testing.T) { - conn := connection.NewFakeConn() - allowFastTransaction = false - testNodeA := testCluster[0] - testNodeA.Exec(conn, toArgs("SET", "a", "a")) - ret := Del(testNodeA, conn, toArgs("DEL", "a", "b", "c")) - asserts.AssertNotError(t, ret) - ret = testNodeA.Exec(conn, toArgs("GET", "a")) - asserts.AssertNullBulk(t, ret) -} diff --git a/cluster/fixed_topo.go b/cluster/fixed_topo.go deleted file mode 100644 index 41c6f51..0000000 --- a/cluster/fixed_topo.go +++ /dev/null @@ -1,58 +0,0 @@ -package cluster - -import ( - "github.com/hdt3213/godis/redis/protocol" - "sync" -) - -// fixedTopology is a fixed cluster topology, used for test -type fixedTopology struct { - mu sync.RWMutex - nodeMap map[string]*Node - slots []*Slot - selfNodeID string -} - -func (fixed *fixedTopology) GetSelfNodeID() string { - return fixed.selfNodeID -} - -func (fixed *fixedTopology) GetNodes() []*Node { - fixed.mu.RLock() - defer fixed.mu.RUnlock() - result := make([]*Node, 0, len(fixed.nodeMap)) - for _, v := range fixed.nodeMap { - result = append(result, v) - } - return result -} - -func (fixed *fixedTopology) GetNode(nodeID string) *Node { - fixed.mu.RLock() - defer fixed.mu.RUnlock() - return fixed.nodeMap[nodeID] -} - -func (fixed *fixedTopology) GetSlots() []*Slot { - return fixed.slots -} - -func (fixed *fixedTopology) StartAsSeed(addr string) protocol.ErrorReply { - return nil -} - -func (fixed *fixedTopology) LoadConfigFile() protocol.ErrorReply { - return nil -} - -func (fixed *fixedTopology) Join(seed string) protocol.ErrorReply { - return protocol.MakeErrReply("fixed topology does not support join") -} - -func (fixed *fixedTopology) SetSlot(slotIDs []uint32, newNodeID string) protocol.ErrorReply { - return protocol.MakeErrReply("fixed topology does not support set slots") -} - -func (fixed *fixedTopology) Close() error { - return nil -} diff --git a/cluster/keys.go b/cluster/keys.go deleted file mode 100644 index b809941..0000000 --- a/cluster/keys.go +++ /dev/null @@ -1,27 +0,0 @@ -package cluster - -import ( - "github.com/hdt3213/godis/interface/redis" - "github.com/hdt3213/godis/redis/protocol" -) - -// FlushDB removes all data in current database -func FlushDB(cluster *Cluster, c redis.Connection, cmdLine [][]byte) redis.Reply { - replies := cluster.broadcast(c, modifyCmd(cmdLine, "FlushDB_")) - var errReply protocol.ErrorReply - for _, v := range replies { - if protocol.IsErrorReply(v) { - errReply = v.(protocol.ErrorReply) - break - } - } - if errReply == nil { - return &protocol.OkReply{} - } - return protocol.MakeErrReply("error occurs: " + errReply.Error()) -} - -// FlushAll removes all data in cluster -func FlushAll(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { - return FlushDB(cluster, c, args) -} diff --git a/cluster/mset.go b/cluster/mset.go deleted file mode 100644 index 43c1cea..0000000 --- a/cluster/mset.go +++ /dev/null @@ -1,181 +0,0 @@ -package cluster - -import ( - "fmt" - "github.com/hdt3213/godis/interface/redis" - "github.com/hdt3213/godis/lib/utils" - "github.com/hdt3213/godis/redis/protocol" - "strconv" -) - -const keyExistsErr = "key exists" - -// MGet atomically get multi key-value from cluster, writeKeys can be distributed on any node -func MGet(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { - if len(cmdLine) < 2 { - return protocol.MakeErrReply("ERR wrong number of arguments for 'mget' command") - } - keys := make([]string, len(cmdLine)-1) - for i := 1; i < len(cmdLine); i++ { - keys[i-1] = string(cmdLine[i]) - } - - resultMap := make(map[string][]byte) - groupMap := cluster.groupBy(keys) - for peer, groupKeys := range groupMap { - resp := cluster.relay(peer, c, makeArgs("MGet_", groupKeys...)) - if protocol.IsErrorReply(resp) { - errReply := resp.(protocol.ErrorReply) - return protocol.MakeErrReply(fmt.Sprintf("ERR during get %s occurs: %v", groupKeys[0], errReply.Error())) - } - arrReply, _ := resp.(*protocol.MultiBulkReply) - for i, v := range arrReply.Args { - key := groupKeys[i] - resultMap[key] = v - } - } - result := make([][]byte, len(keys)) - for i, k := range keys { - result[i] = resultMap[k] - } - return protocol.MakeMultiBulkReply(result) -} - -// MSet atomically sets multi key-value in cluster, writeKeys can be distributed on any node -func MSet(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { - argCount := len(cmdLine) - 1 - if argCount%2 != 0 || argCount < 1 { - return protocol.MakeErrReply("ERR wrong number of arguments for 'mset' command") - } - - size := argCount / 2 - keys := make([]string, size) - valueMap := make(map[string]string) - for i := 0; i < size; i++ { - keys[i] = string(cmdLine[2*i+1]) - valueMap[keys[i]] = string(cmdLine[2*i+2]) - } - - groupMap := cluster.groupBy(keys) - if len(groupMap) == 1 && allowFastTransaction { // do fast - for peer := range groupMap { - return cluster.relay(peer, c, modifyCmd(cmdLine, "MSet_")) - } - } - - //prepare - var errReply redis.Reply - txID := cluster.idGenerator.NextID() - txIDStr := strconv.FormatInt(txID, 10) - rollback := false - for peer, group := range groupMap { - peerArgs := []string{txIDStr, "MSET"} - for _, k := range group { - peerArgs = append(peerArgs, k, valueMap[k]) - } - resp := cluster.relay(peer, c, makeArgs("Prepare", peerArgs...)) - if protocol.IsErrorReply(resp) { - errReply = resp - rollback = true - break - } - } - if rollback { - // rollback - requestRollback(cluster, c, txID, groupMap) - } else { - _, errReply = requestCommit(cluster, c, txID, groupMap) - rollback = errReply != nil - } - if !rollback { - return &protocol.OkReply{} - } - return errReply - -} - -// MSetNX sets multi key-value in database, only if none of the given writeKeys exist and all given writeKeys are on the same node -func MSetNX(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { - argCount := len(cmdLine) - 1 - if argCount%2 != 0 || argCount < 1 { - return protocol.MakeErrReply("ERR wrong number of arguments for 'msetnx' command") - } - - size := argCount / 2 - keys := make([]string, size) - valueMap := make(map[string]string) - for i := 0; i < size; i++ { - keys[i] = string(cmdLine[2*i+1]) - valueMap[keys[i]] = string(cmdLine[2*i+2]) - } - - groupMap := cluster.groupBy(keys) - if len(groupMap) == 1 && allowFastTransaction { // do fast - for peer := range groupMap { - return cluster.relay(peer, c, modifyCmd(cmdLine, "MSetNX_")) - } - } - - // prepare procedure: - // 1. Normal tcc preparation (undo log and lock related keys) - // 2. Peer checks whether any key already exists, If so it will return keyExistsErr. Then coordinator will request rollback over all participated nodes - var errReply redis.Reply - txID := cluster.idGenerator.NextID() - txIDStr := strconv.FormatInt(txID, 10) - rollback := false - for node, group := range groupMap { - nodeArgs := []string{txIDStr, "MSETNX"} - for _, k := range group { - nodeArgs = append(nodeArgs, k, valueMap[k]) - } - resp := cluster.relay(node, c, makeArgs("Prepare", nodeArgs...)) - if protocol.IsErrorReply(resp) { - re := resp.(protocol.ErrorReply) - if re.Error() == keyExistsErr { - errReply = protocol.MakeIntReply(0) - } else { - errReply = resp - } - rollback = true - break - } - } - if rollback { - // rollback - requestRollback(cluster, c, txID, groupMap) - return errReply - } - _, errReply = requestCommit(cluster, c, txID, groupMap) - rollback = errReply != nil - if !rollback { - return protocol.MakeIntReply(1) - } - return errReply -} - -func prepareMSetNx(cluster *Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply { - args := cmdLine[1:] - if len(args)%2 != 0 { - return protocol.MakeSyntaxErrReply() - } - size := len(args) / 2 - values := make([][]byte, size) - keys := make([]string, size) - for i := 0; i < size; i++ { - keys[i] = string(args[2*i]) - values[i] = args[2*i+1] - } - re := cluster.db.ExecWithLock(conn, utils.ToCmdLine2("ExistIn", keys...)) - if protocol.IsErrorReply(re) { - return re - } - _, ok := re.(*protocol.EmptyMultiBulkReply) - if !ok { - return protocol.MakeErrReply(keyExistsErr) - } - return protocol.MakeOkReply() -} - -func init() { - registerPrepareFunc("MSetNx", prepareMSetNx) -} diff --git a/cluster/mset_test.go b/cluster/mset_test.go deleted file mode 100644 index 18127a0..0000000 --- a/cluster/mset_test.go +++ /dev/null @@ -1,30 +0,0 @@ -package cluster - -import ( - "github.com/hdt3213/godis/redis/connection" - "github.com/hdt3213/godis/redis/protocol/asserts" - "testing" -) - -func TestMSet(t *testing.T) { - conn := connection.NewFakeConn() - allowFastTransaction = false - testNodeA := testCluster[0] - ret := MSet(testNodeA, conn, toArgs("MSET", "a", "a", "b", "b")) - asserts.AssertNotError(t, ret) - ret = testNodeA.Exec(conn, toArgs("MGET", "a", "b")) - asserts.AssertMultiBulkReply(t, ret, []string{"a", "b"}) -} - -func TestMSetNx(t *testing.T) { - conn := connection.NewFakeConn() - allowFastTransaction = false - testNodeA := testCluster[0] - FlushAll(testNodeA, conn, toArgs("FLUSHALL")) - ret := MSetNX(testNodeA, conn, toArgs("MSETNX", "a", "a", "b", "b")) - asserts.AssertNotError(t, ret) - ret = MSetNX(testNodeA, conn, toArgs("MSETNX", "a", "a", "c", "c")) - asserts.AssertNotError(t, ret) - ret = testNodeA.Exec(conn, toArgs("MGET", "a", "b", "c")) - asserts.AssertMultiBulkReply(t, ret, []string{"a", "b", ""}) -} diff --git a/cluster/multi.go b/cluster/multi.go deleted file mode 100644 index 9bfd740..0000000 --- a/cluster/multi.go +++ /dev/null @@ -1,165 +0,0 @@ -package cluster - -import ( - "github.com/hdt3213/godis/database" - "github.com/hdt3213/godis/interface/redis" - "github.com/hdt3213/godis/lib/utils" - "github.com/hdt3213/godis/redis/connection" - "github.com/hdt3213/godis/redis/protocol" - "strconv" -) - -const relayMulti = "multi_" -const innerWatch = "watch_" - -var relayMultiBytes = []byte(relayMulti) - -// cmdLine == []string{"exec"} -func execMulti(cluster *Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply { - if !conn.InMultiState() { - return protocol.MakeErrReply("ERR EXEC without MULTI") - } - defer conn.SetMultiState(false) - cmdLines := conn.GetQueuedCmdLine() - - // analysis related keys - keys := make([]string, 0) // may contains duplicate - for _, cl := range cmdLines { - wKeys, rKeys := database.GetRelatedKeys(cl) - keys = append(keys, wKeys...) - keys = append(keys, rKeys...) - } - watching := conn.GetWatching() - watchingKeys := make([]string, 0, len(watching)) - for key := range watching { - watchingKeys = append(watchingKeys, key) - } - keys = append(keys, watchingKeys...) - if len(keys) == 0 { - // empty transaction or only `PING`s - return cluster.db.ExecMulti(conn, watching, cmdLines) - } - groupMap := cluster.groupBy(keys) - if len(groupMap) > 1 { - return protocol.MakeErrReply("ERR MULTI commands transaction must within one slot in cluster mode") - } - var peer string - // assert len(groupMap) == 1 - for p := range groupMap { - peer = p - } - - // out parser not support protocol.MultiRawReply, so we have to encode it - if peer == cluster.self { - for _, key := range keys { - if errReply := cluster.ensureKey(key); errReply != nil { - return errReply - } - } - return cluster.db.ExecMulti(conn, watching, cmdLines) - } - return execMultiOnOtherNode(cluster, conn, peer, watching, cmdLines) -} - -func execMultiOnOtherNode(cluster *Cluster, conn redis.Connection, peer string, watching map[string]uint32, cmdLines []CmdLine) redis.Reply { - defer func() { - conn.ClearQueuedCmds() - conn.SetMultiState(false) - }() - relayCmdLine := [][]byte{ // relay it to executing node - relayMultiBytes, - } - // watching commands - var watchingCmdLine = utils.ToCmdLine(innerWatch) - for key, ver := range watching { - verStr := strconv.FormatUint(uint64(ver), 10) - watchingCmdLine = append(watchingCmdLine, []byte(key), []byte(verStr)) - } - relayCmdLine = append(relayCmdLine, encodeCmdLine([]CmdLine{watchingCmdLine})...) - relayCmdLine = append(relayCmdLine, encodeCmdLine(cmdLines)...) - var rawRelayResult redis.Reply - rawRelayResult = cluster.relay(peer, connection.NewFakeConn(), relayCmdLine) - if protocol.IsErrorReply(rawRelayResult) { - return rawRelayResult - } - _, ok := rawRelayResult.(*protocol.EmptyMultiBulkReply) - if ok { - return rawRelayResult - } - relayResult, ok := rawRelayResult.(*protocol.MultiBulkReply) - if !ok { - return protocol.MakeErrReply("execute failed") - } - rep, err := parseEncodedMultiRawReply(relayResult.Args) - if err != nil { - return protocol.MakeErrReply(err.Error()) - } - return rep -} - -// execRelayedMulti execute relayed multi commands transaction -// cmdLine format: _multi watch-cmdLine base64ed-cmdLine -// result format: base64ed-protocol list -func execRelayedMulti(cluster *Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply { - if len(cmdLine) < 2 { - return protocol.MakeArgNumErrReply("_exec") - } - decoded, err := parseEncodedMultiRawReply(cmdLine[1:]) - if err != nil { - return protocol.MakeErrReply(err.Error()) - } - var txCmdLines []CmdLine - for _, rep := range decoded.Replies { - mbr, ok := rep.(*protocol.MultiBulkReply) - if !ok { - return protocol.MakeErrReply("exec failed") - } - txCmdLines = append(txCmdLines, mbr.Args) - } - watching := make(map[string]uint32) - watchCmdLine := txCmdLines[0] // format: watch_ key1 ver1 key2 ver2... - for i := 2; i < len(watchCmdLine); i += 2 { - key := string(watchCmdLine[i-1]) - verStr := string(watchCmdLine[i]) - ver, err := strconv.ParseUint(verStr, 10, 64) - if err != nil { - return protocol.MakeErrReply("watching command line failed") - } - watching[key] = uint32(ver) - } - rawResult := cluster.db.ExecMulti(conn, watching, txCmdLines[1:]) - _, ok := rawResult.(*protocol.EmptyMultiBulkReply) - if ok { - return rawResult - } - resultMBR, ok := rawResult.(*protocol.MultiRawReply) - if !ok { - return protocol.MakeErrReply("exec failed") - } - return encodeMultiRawReply(resultMBR) -} - -func execWatch(cluster *Cluster, conn redis.Connection, args [][]byte) redis.Reply { - if len(args) < 2 { - return protocol.MakeArgNumErrReply("watch") - } - args = args[1:] - watching := conn.GetWatching() - for _, bkey := range args { - key := string(bkey) - err := cluster.ensureKey(key) - if err != nil { - return err - } - result := cluster.relayByKey(key, conn, utils.ToCmdLine("GetVer", key)) - if protocol.IsErrorReply(result) { - return result - } - intResult, ok := result.(*protocol.IntReply) - if !ok { - return protocol.MakeErrReply("get version failed") - } - watching[key] = uint32(intResult.Code) - } - return protocol.MakeOkReply() -} diff --git a/cluster/multi_helper.go b/cluster/multi_helper.go deleted file mode 100644 index 2d1f6c2..0000000 --- a/cluster/multi_helper.go +++ /dev/null @@ -1,48 +0,0 @@ -package cluster - -import ( - "bytes" - "encoding/base64" - "github.com/hdt3213/godis/redis/parser" - "github.com/hdt3213/godis/redis/protocol" -) - -func encodeCmdLine(cmdLines []CmdLine) [][]byte { - var result [][]byte - for _, line := range cmdLines { - raw := protocol.MakeMultiBulkReply(line).ToBytes() - encoded := make([]byte, base64.StdEncoding.EncodedLen(len(raw))) - base64.StdEncoding.Encode(encoded, raw) - result = append(result, encoded) - } - return result -} - -func parseEncodedMultiRawReply(args [][]byte) (*protocol.MultiRawReply, error) { - cmdBuf := new(bytes.Buffer) - for _, arg := range args { - dbuf := make([]byte, base64.StdEncoding.DecodedLen(len(arg))) - n, err := base64.StdEncoding.Decode(dbuf, arg) - if err != nil { - continue - } - cmdBuf.Write(dbuf[:n]) - } - cmds, err := parser.ParseBytes(cmdBuf.Bytes()) - if err != nil { - return nil, protocol.MakeErrReply(err.Error()) - } - return protocol.MakeMultiRawReply(cmds), nil -} - -// todo: use multi raw reply instead of base64 -func encodeMultiRawReply(src *protocol.MultiRawReply) *protocol.MultiBulkReply { - args := make([][]byte, 0, len(src.Replies)) - for _, rep := range src.Replies { - raw := rep.ToBytes() - encoded := make([]byte, base64.StdEncoding.EncodedLen(len(raw))) - base64.StdEncoding.Encode(encoded, raw) - args = append(args, encoded) - } - return protocol.MakeMultiBulkReply(args) -} diff --git a/cluster/multi_test.go b/cluster/multi_test.go deleted file mode 100644 index 8413196..0000000 --- a/cluster/multi_test.go +++ /dev/null @@ -1,92 +0,0 @@ -package cluster - -import ( - "github.com/hdt3213/godis/lib/utils" - "github.com/hdt3213/godis/redis/connection" - "github.com/hdt3213/godis/redis/protocol" - "github.com/hdt3213/godis/redis/protocol/asserts" - "testing" -) - -func TestMultiExecOnSelf(t *testing.T) { - testNodeA := testCluster[0] - conn := new(connection.FakeConn) - testNodeA.db.Exec(conn, utils.ToCmdLine("FLUSHALL")) - result := testNodeA.Exec(conn, toArgs("MULTI")) - asserts.AssertNotError(t, result) - key := "{abc}" + utils.RandString(10) - value := utils.RandString(10) - testNodeA.Exec(conn, utils.ToCmdLine("set", key, value)) - key2 := "{abc}" + utils.RandString(10) - testNodeA.Exec(conn, utils.ToCmdLine("rpush", key2, value)) - result = testNodeA.Exec(conn, utils.ToCmdLine("exec")) - asserts.AssertNotError(t, result) - result = testNodeA.Exec(conn, utils.ToCmdLine("get", key)) - asserts.AssertBulkReply(t, result, value) - result = testNodeA.Exec(conn, utils.ToCmdLine("lrange", key2, "0", "-1")) - asserts.AssertMultiBulkReply(t, result, []string{value}) -} - -func TestEmptyMulti(t *testing.T) { - testNodeA := testCluster[0] - conn := new(connection.FakeConn) - testNodeA.Exec(conn, utils.ToCmdLine("FLUSHALL")) - result := testNodeA.Exec(conn, toArgs("MULTI")) - asserts.AssertNotError(t, result) - result = testNodeA.Exec(conn, utils.ToCmdLine("GET", "a")) - asserts.AssertNotError(t, result) - result = testNodeA.Exec(conn, utils.ToCmdLine("EXEC")) - asserts.AssertNotError(t, result) - mbr := result.(*protocol.MultiRawReply) - asserts.AssertNullBulk(t, mbr.Replies[0]) -} - -func TestMultiExecOnOthers(t *testing.T) { - testNodeA := testCluster[0] - conn := new(connection.FakeConn) - testNodeA.Exec(conn, utils.ToCmdLine("FLUSHALL")) - result := testNodeA.Exec(conn, toArgs("MULTI")) - asserts.AssertNotError(t, result) - key := utils.RandString(10) - value := utils.RandString(10) - testNodeA.Exec(conn, utils.ToCmdLine("rpush", key, value)) - testNodeA.Exec(conn, utils.ToCmdLine("lrange", key, "0", "-1")) - - cmdLines := conn.GetQueuedCmdLine() - rawResp := execMultiOnOtherNode(testNodeA, conn, testNodeA.self, nil, cmdLines) - rep := rawResp.(*protocol.MultiRawReply) - if len(rep.Replies) != 2 { - t.Errorf("expect 2 replies actual %d", len(rep.Replies)) - } - asserts.AssertMultiBulkReply(t, rep.Replies[1], []string{value}) -} - -func TestWatch(t *testing.T) { - testNodeA := testCluster[0] - for i := 0; i < 10; i++ { - conn := new(connection.FakeConn) - key := "{1}" + utils.RandString(10) - key2 := "{1}" + utils.RandString(10) // use hash tag to ensure same slot - value := utils.RandString(10) - testNodeA.Exec(conn, utils.ToCmdLine("FLUSHALL")) - testNodeA.Exec(conn, utils.ToCmdLine("watch", key)) - testNodeA.Exec(conn, utils.ToCmdLine("set", key, value)) - result := testNodeA.Exec(conn, toArgs("MULTI")) - asserts.AssertNotError(t, result) - value2 := utils.RandString(10) - testNodeA.Exec(conn, utils.ToCmdLine("set", key2, value2)) - result = testNodeA.Exec(conn, utils.ToCmdLine("exec")) - asserts.AssertNotError(t, result) - result = testNodeA.Exec(conn, utils.ToCmdLine("get", key2)) - asserts.AssertNullBulk(t, result) - - testNodeA.Exec(conn, utils.ToCmdLine("watch", key)) - result = testNodeA.Exec(conn, toArgs("MULTI")) - asserts.AssertNotError(t, result) - testNodeA.Exec(conn, utils.ToCmdLine("set", key2, value2)) - result = testNodeA.Exec(conn, utils.ToCmdLine("exec")) - asserts.AssertNotError(t, result) - result = testNodeA.Exec(conn, utils.ToCmdLine("get", key2)) - asserts.AssertBulkReply(t, result, value2) - } -} diff --git a/cluster/pubsub.go b/cluster/pubsub.go deleted file mode 100644 index ed86b2d..0000000 --- a/cluster/pubsub.go +++ /dev/null @@ -1,35 +0,0 @@ -package cluster - -import ( - "github.com/hdt3213/godis/interface/redis" - "github.com/hdt3213/godis/lib/logger" - "github.com/hdt3213/godis/redis/protocol" -) - -const ( - relayPublish = "publish_" -) - -// Publish broadcasts msg to all peers in cluster when receive publish command from client -func Publish(cluster *Cluster, c redis.Connection, cmdLine [][]byte) redis.Reply { - var count int64 = 0 - results := cluster.broadcast(c, modifyCmd(cmdLine, relayPublish)) - for _, val := range results { - if errReply, ok := val.(protocol.ErrorReply); ok { - logger.Error("publish occurs error: " + errReply.Error()) - } else if intReply, ok := val.(*protocol.IntReply); ok { - count += intReply.Code - } - } - return protocol.MakeIntReply(count) -} - -// Subscribe puts the given connection into the given channel -func Subscribe(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { - return cluster.db.Exec(c, args) // let local db.hub handle subscribe -} - -// UnSubscribe removes the given connection from the given channel -func UnSubscribe(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { - return cluster.db.Exec(c, args) // let local db.hub handle subscribe -} diff --git a/cluster/pubsub_test.go b/cluster/pubsub_test.go deleted file mode 100644 index 0894b1b..0000000 --- a/cluster/pubsub_test.go +++ /dev/null @@ -1,49 +0,0 @@ -package cluster - -import ( - "github.com/hdt3213/godis/lib/utils" - "github.com/hdt3213/godis/redis/connection" - "github.com/hdt3213/godis/redis/parser" - "github.com/hdt3213/godis/redis/protocol/asserts" - "testing" -) - -func TestPublish(t *testing.T) { - testNodeA := testCluster[0] - channel := utils.RandString(5) - msg := utils.RandString(5) - conn := connection.NewFakeConn() - Subscribe(testNodeA, conn, utils.ToCmdLine("SUBSCRIBE", channel)) - conn.Clean() // clean subscribe success - Publish(testNodeA, conn, utils.ToCmdLine("PUBLISH", channel, msg)) - data := conn.Bytes() - ret, err := parser.ParseOne(data) - if err != nil { - t.Error(err) - return - } - asserts.AssertMultiBulkReply(t, ret, []string{ - "message", - channel, - msg, - }) - - // unsubscribe - UnSubscribe(testNodeA, conn, utils.ToCmdLine("UNSUBSCRIBE", channel)) - conn.Clean() - Publish(testNodeA, conn, utils.ToCmdLine("PUBLISH", channel, msg)) - data = conn.Bytes() - if len(data) > 0 { - t.Error("expect no msg") - } - - // unsubscribe all - Subscribe(testNodeA, conn, utils.ToCmdLine("SUBSCRIBE", channel)) - UnSubscribe(testNodeA, conn, utils.ToCmdLine("UNSUBSCRIBE")) - conn.Clean() - Publish(testNodeA, conn, utils.ToCmdLine("PUBLISH", channel, msg)) - data = conn.Bytes() - if len(data) > 0 { - t.Error("expect no msg") - } -} diff --git a/cluster/raft.go b/cluster/raft.go deleted file mode 100644 index 55dd496..0000000 --- a/cluster/raft.go +++ /dev/null @@ -1,1075 +0,0 @@ -package cluster - -import ( - "bufio" - "bytes" - "encoding/json" - "errors" - "fmt" - "github.com/hdt3213/godis/config" - "github.com/hdt3213/godis/datastruct/lock" - "github.com/hdt3213/godis/interface/redis" - "github.com/hdt3213/godis/lib/logger" - "github.com/hdt3213/godis/lib/utils" - "github.com/hdt3213/godis/redis/connection" - "github.com/hdt3213/godis/redis/protocol" - "math/rand" - "os" - "sort" - "strconv" - "strings" - "sync" - "time" -) - -const slotCount int = 16384 - -type raftState int - -func init() { - rand.Seed(time.Now().UnixNano()) -} - -const ( - nodeFlagLeader uint32 = 1 << iota - nodeFlagCandidate - nodeFlagLearner -) - -const ( - follower raftState = iota - leader - candidate - learner -) - -var stateNames = map[raftState]string{ - follower: "follower", - leader: "leader", - candidate: "candidate", - learner: "learner", -} - -func (node *Node) setState(state raftState) { - node.Flags &= ^uint32(0x7) // clean - switch state { - case follower: - break - case leader: - node.Flags |= nodeFlagLeader - case candidate: - node.Flags |= nodeFlagCandidate - case learner: - node.Flags |= nodeFlagLearner - } -} - -func (node *Node) getState() raftState { - if node.Flags&nodeFlagLeader > 0 { - return leader - } - if node.Flags&nodeFlagCandidate > 0 { - return candidate - } - if node.Flags&nodeFlagLearner > 0 { - return learner - } - return follower -} - -type logEntry struct { - Term int - Index int - Event int - wg *sync.WaitGroup - // payload - SlotIDs []uint32 - NodeID string - Addr string -} - -func (e *logEntry) marshal() []byte { - bin, _ := json.Marshal(e) - return bin -} - -func (e *logEntry) unmarshal(bin []byte) error { - err := json.Unmarshal(bin, e) - if err != nil { - return fmt.Errorf("illegal message: %v", err) - } - return nil -} - -type Raft struct { - cluster *Cluster - mu sync.RWMutex - selfNodeID string - slots []*Slot - leaderId string - nodes map[string]*Node - log []*logEntry // log index begin from 0 - baseIndex int // baseIndex + 1 == log[0].Index, it can be considered as the previous log index - baseTerm int // baseTerm is the term of the previous log entry - state raftState - term int - votedFor string - voteCount int - committedIndex int // index of the last committed logEntry - proposedIndex int // index of the last proposed logEntry - heartbeatChan chan *heartbeat - persistFile string - electionAlarm time.Time - closeChan chan struct{} - closed bool - - // for leader - nodeIndexMap map[string]*nodeStatus - nodeLock *lock.Locks -} - -func newRaft(cluster *Cluster, persistFilename string) *Raft { - return &Raft{ - cluster: cluster, - persistFile: persistFilename, - closeChan: make(chan struct{}), - } -} - -type heartbeat struct { - sender string - term int - entries []*logEntry - commitTo int -} - -type nodeStatus struct { - receivedIndex int // received log index, not committed index -} - -func (raft *Raft) GetNodes() []*Node { - raft.mu.RLock() - defer raft.mu.RUnlock() - result := make([]*Node, 0, len(raft.nodes)) - for _, v := range raft.nodes { - result = append(result, v) - } - return result -} - -func (raft *Raft) GetNode(nodeID string) *Node { - raft.mu.RLock() - defer raft.mu.RUnlock() - return raft.nodes[nodeID] -} - -func (raft *Raft) getLogEntries(beg, end int) []*logEntry { - if beg <= raft.baseIndex || end > raft.baseIndex+len(raft.log)+1 { - return nil - } - i := beg - raft.baseIndex - 1 - j := end - raft.baseIndex - 1 - return raft.log[i:j] -} - -func (raft *Raft) getLogEntriesFrom(beg int) []*logEntry { - if beg <= raft.baseIndex { - return nil - } - i := beg - raft.baseIndex - 1 - return raft.log[i:] -} - -func (raft *Raft) getLogEntry(idx int) *logEntry { - if idx < raft.baseIndex || idx >= raft.baseIndex+len(raft.log) { - return nil - } - return raft.log[idx-raft.baseIndex] -} - -func (raft *Raft) initLog(baseTerm, baseIndex int, entries []*logEntry) { - raft.baseIndex = baseIndex - raft.baseTerm = baseTerm - raft.log = entries -} - -const ( - electionTimeoutMaxMs = 4000 - electionTimeoutMinMs = 2800 -) - -func randRange(from, to int) int { - return rand.Intn(to-from) + from -} - -// nextElectionAlarm generates normal election timeout, with randomness -func nextElectionAlarm() time.Time { - return time.Now().Add(time.Duration(randRange(electionTimeoutMinMs, electionTimeoutMaxMs)) * time.Millisecond) -} - -func compareLogIndex(term1, index1, term2, index2 int) int { - if term1 != term2 { - return term1 - term2 - } - return index1 - index2 -} - -func (cluster *Cluster) asRaft() *Raft { - return cluster.topology.(*Raft) -} - -// StartAsSeed starts cluster as seed node -func (raft *Raft) StartAsSeed(listenAddr string) protocol.ErrorReply { - selfNodeID := listenAddr - raft.mu.Lock() - defer raft.mu.Unlock() - raft.slots = make([]*Slot, slotCount) - // claim all slots - for i := range raft.slots { - raft.slots[i] = &Slot{ - ID: uint32(i), - NodeID: selfNodeID, - } - } - raft.selfNodeID = selfNodeID - raft.leaderId = selfNodeID - raft.nodes = make(map[string]*Node) - raft.nodes[selfNodeID] = &Node{ - ID: selfNodeID, - Addr: listenAddr, - Slots: raft.slots, - } - raft.nodes[selfNodeID].setState(leader) - raft.nodeIndexMap = map[string]*nodeStatus{ - selfNodeID: { - receivedIndex: raft.proposedIndex, - }, - } - raft.start(leader) - raft.cluster.self = selfNodeID - return nil -} - -func (raft *Raft) GetSlots() []*Slot { - return raft.slots -} - -// GetSelfNodeID returns node id of current node -func (raft *Raft) GetSelfNodeID() string { - return raft.selfNodeID -} - -const raftClosed = "ERR raft has closed" - -func (raft *Raft) start(state raftState) { - raft.state = state - raft.heartbeatChan = make(chan *heartbeat, 1) - raft.electionAlarm = nextElectionAlarm() - //raft.nodeIndexMap = make(map[string]*nodeStatus) - go func() { - for { - if raft.closed { - logger.Info("quit raft job") - return - } - switch raft.state { - case follower: - raft.followerJob() - case candidate: - raft.candidateJob() - case leader: - raft.leaderJob() - } - } - }() -} - -func (raft *Raft) Close() error { - raft.closed = true - close(raft.closeChan) - return raft.persist() -} - -func (raft *Raft) followerJob() { - electionTimeout := time.Until(raft.electionAlarm) - select { - case hb := <-raft.heartbeatChan: - raft.mu.Lock() - nodeId := hb.sender - raft.nodes[nodeId].lastHeard = time.Now() - // todo: drop duplicate entry - raft.log = append(raft.log, hb.entries...) - raft.proposedIndex += len(hb.entries) - raft.applyLogEntries(raft.getLogEntries(raft.committedIndex+1, hb.commitTo+1)) - raft.committedIndex = hb.commitTo - raft.electionAlarm = nextElectionAlarm() - raft.mu.Unlock() - case <-time.After(electionTimeout): - // change to candidate - logger.Info("raft leader timeout") - raft.mu.Lock() - raft.electionAlarm = nextElectionAlarm() - if raft.votedFor != "" { - // received request-vote and has voted during waiting timeout - raft.mu.Unlock() - logger.Infof("%s has voted for %s, give up being a candidate", raft.selfNodeID, raft.votedFor) - return - } - logger.Info("change to candidate") - raft.state = candidate - raft.mu.Unlock() - case <-raft.closeChan: - return - } -} - -func (raft *Raft) getLogProgressWithinLock() (int, int) { - var lastLogTerm, lastLogIndex int - if len(raft.log) > 0 { - lastLog := raft.log[len(raft.log)-1] - lastLogTerm = lastLog.Term - lastLogIndex = lastLog.Index - } else { - lastLogTerm = raft.baseTerm - lastLogIndex = raft.baseIndex - } - return lastLogTerm, lastLogIndex -} - -func (raft *Raft) candidateJob() { - raft.mu.Lock() - - raft.term++ - raft.votedFor = raft.selfNodeID - raft.voteCount++ - currentTerm := raft.term - lastLogTerm, lastLogIndex := raft.getLogProgressWithinLock() - req := &voteReq{ - nodeID: raft.selfNodeID, - lastLogTerm: lastLogTerm, - lastLogIndex: lastLogIndex, - term: raft.term, - } - raft.mu.Unlock() - args := append([][]byte{ - []byte("raft"), - []byte("request-vote"), - }, req.marshal()...) - conn := connection.NewFakeConn() - wg := sync.WaitGroup{} - elected := make(chan struct{}, len(raft.nodes)) // may receive many elected message during an election, only handle the first one - voteFinished := make(chan struct{}) - for nodeID := range raft.nodes { - if nodeID == raft.selfNodeID { - continue - } - nodeID := nodeID - wg.Add(1) - go func() { - defer wg.Done() - rawResp := raft.cluster.relay(nodeID, conn, args) - if err, ok := rawResp.(protocol.ErrorReply); ok { - logger.Info(fmt.Sprintf("cannot get vote response from %s, %v", nodeID, err)) - return - } - respBody, ok := rawResp.(*protocol.MultiBulkReply) - if !ok { - logger.Info(fmt.Sprintf("cannot get vote response from %s, not a multi bulk reply", nodeID)) - return - } - resp := &voteResp{} - err := resp.unmarshal(respBody.Args) - if err != nil { - logger.Info(fmt.Sprintf("cannot get vote response from %s, %v", nodeID, err)) - return - } - - raft.mu.Lock() - defer raft.mu.Unlock() - logger.Info("received vote response from " + nodeID) - // check-lock-check - if currentTerm != raft.term || raft.state != candidate { - // vote has finished during waiting lock - logger.Info("vote has finished during waiting lock, current term " + strconv.Itoa(raft.term) + " state " + strconv.Itoa(int(raft.state))) - return - } - if resp.term > raft.term { - logger.Infof(fmt.Sprintf("vote response from %s has newer term %d", nodeID, resp.term)) - raft.term = resp.term - raft.state = follower - raft.votedFor = "" - raft.leaderId = resp.voteFor - return - } - - if resp.voteFor == raft.selfNodeID { - logger.Infof(fmt.Sprintf("get vote from %s", nodeID)) - raft.voteCount++ - if raft.voteCount >= len(raft.nodes)/2+1 { - logger.Info("elected to be the leader") - raft.state = leader - elected <- struct{}{} // notify the main goroutine to stop waiting - return - } - } - }() - } - go func() { - wg.Wait() - voteFinished <- struct{}{} - }() - - // wait vote finished or elected - select { - case <-voteFinished: - raft.mu.Lock() - if raft.term == currentTerm && raft.state == candidate { - logger.Infof("%s failed to be elected, back to follower", raft.selfNodeID) - raft.state = follower - raft.votedFor = "" - raft.voteCount = 0 - } - raft.mu.Unlock() - case <-elected: - raft.votedFor = "" - raft.voteCount = 0 - logger.Info("win election, take leader of term " + strconv.Itoa(currentTerm)) - case <-raft.closeChan: - return - } -} - -// getNodeIndexMap ask offset of each node and init nodeIndexMap as new leader -// invoker provide lock -func (raft *Raft) getNodeIndexMap() { - // ask node index - nodeIndexMap := make(map[string]*nodeStatus) - for _, node := range raft.nodes { - status := raft.askNodeIndex(node) - if status != nil { - nodeIndexMap[node.ID] = status - } - } - logger.Info("got offsets of nodes") - raft.nodeIndexMap = nodeIndexMap -} - -// askNodeIndex ask another node for its log index -// return nil if failed -func (raft *Raft) askNodeIndex(node *Node) *nodeStatus { - if node.ID == raft.selfNodeID { - return &nodeStatus{ - receivedIndex: raft.proposedIndex, - } - } - logger.Debugf("ask %s for offset", node.ID) - c := connection.NewFakeConn() - reply := raft.cluster.relay(node.Addr, c, utils.ToCmdLine("raft", "get-offset")) - if protocol.IsErrorReply(reply) { - logger.Infof("ask node %s index failed: %v", node.ID, reply) - return nil - } - return &nodeStatus{ - receivedIndex: int(reply.(*protocol.IntReply).Code), - } -} - -func (raft *Raft) leaderJob() { - raft.mu.Lock() - if raft.nodeIndexMap == nil { - // getNodeIndexMap with lock, because leader cannot work without nodeIndexMap - raft.getNodeIndexMap() - } - if raft.nodeLock == nil { - raft.nodeLock = lock.Make(1024) - } - var recvedIndices []int - for _, status := range raft.nodeIndexMap { - recvedIndices = append(recvedIndices, status.receivedIndex) - } - sort.Slice(recvedIndices, func(i, j int) bool { - return recvedIndices[i] > recvedIndices[j] - }) - // more than half of the nodes received entries, can be committed - commitTo := 0 - if len(recvedIndices) > 0 { - commitTo = recvedIndices[len(recvedIndices)/2] - } - // new node (received index is 0) may cause commitTo less than raft.committedIndex - if commitTo > raft.committedIndex { - toCommit := raft.getLogEntries(raft.committedIndex+1, commitTo+1) // left inclusive, right exclusive - raft.applyLogEntries(toCommit) - raft.committedIndex = commitTo - for _, entry := range toCommit { - if entry.wg != nil { - entry.wg.Done() - } - } - } - // save receivedIndex in local variable in case changed by other goroutines - proposalIndex := raft.proposedIndex - snapshot := raft.makeSnapshot() // the snapshot is consistent with the committed log - for _, node := range raft.nodes { - if node.ID == raft.selfNodeID { - continue - } - node := node - status := raft.nodeIndexMap[node.ID] - go func() { - raft.nodeLock.Lock(node.ID) - defer raft.nodeLock.UnLock(node.ID) - var cmdLine [][]byte - if status == nil { - logger.Debugf("node %s offline", node.ID) - status = raft.askNodeIndex(node) - if status != nil { - // get status, node has back online - raft.mu.Lock() - raft.nodeIndexMap[node.ID] = status - raft.mu.Unlock() - } else { - // node still offline - return - } - } - if status.receivedIndex < raft.baseIndex { - // some entries are missed due to change of leader, send full snapshot - cmdLine = utils.ToCmdLine( - "raft", - "load-snapshot", - raft.selfNodeID, - ) - // see makeSnapshotForFollower - cmdLine = append(cmdLine, []byte(node.ID), []byte(strconv.Itoa(int(follower)))) - cmdLine = append(cmdLine, snapshot[2:]...) - } else { - // leader has all needed entries, send normal heartbeat - req := &heartbeatRequest{ - leaderId: raft.leaderId, - term: raft.term, - commitTo: commitTo, - } - // append new entries to heartbeat payload - if proposalIndex > status.receivedIndex { - req.prevLogTerm = raft.getLogEntry(status.receivedIndex).Term - req.prevLogIndex = status.receivedIndex - req.entries = raft.getLogEntriesFrom(status.receivedIndex + 1) - } - cmdLine = utils.ToCmdLine( - "raft", - "heartbeat", - ) - cmdLine = append(cmdLine, req.marshal()...) - } - - conn := connection.NewFakeConn() - resp := raft.cluster.relay(node.ID, conn, cmdLine) - switch respPayload := resp.(type) { - case *protocol.MultiBulkReply: - term, _ := strconv.Atoi(string(respPayload.Args[0])) - recvedIndex, _ := strconv.Atoi(string(respPayload.Args[1])) - if term > raft.term { - // todo: rejoin as follower - return - } - raft.mu.Lock() - raft.nodeIndexMap[node.ID].receivedIndex = recvedIndex - raft.mu.Unlock() - case protocol.ErrorReply: - if respPayload.Error() == prevLogMismatch { - cmdLine = utils.ToCmdLine( - "raft", - "load-snapshot", - raft.selfNodeID, - ) - cmdLine = append(cmdLine, []byte(node.ID), []byte(strconv.Itoa(int(follower)))) - cmdLine = append(cmdLine, snapshot[2:]...) - resp := raft.cluster.relay(node.ID, conn, cmdLine) - if err, ok := resp.(protocol.ErrorReply); ok { - logger.Errorf("heartbeat to %s failed: %v", node.ID, err) - return - } - } else if respPayload.Error() == nodeNotReady { - logger.Infof("%s is not ready yet", node.ID) - return - } else { - logger.Errorf("heartbeat to %s failed: %v", node.ID, respPayload.Error()) - return - } - - } - }() - } - raft.mu.Unlock() - time.Sleep(time.Millisecond * 1000) -} - -func init() { - registerCmd("raft", execRaft) -} - -func execRaft(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { - raft := cluster.asRaft() - if raft.closed { - return protocol.MakeErrReply(raftClosed) - } - if len(args) < 2 { - return protocol.MakeArgNumErrReply("raft") - } - subCmd := strings.ToLower(string(args[1])) - switch subCmd { - case "request-vote": - // command line: raft request-vote nodeId index term - // Decide whether to vote when other nodes solicit votes - return execRaftRequestVote(cluster, c, args[2:]) - case "heartbeat": - // execRaftHeartbeat handles heartbeat from leader as follower or learner - // command line: raft heartbeat nodeID term number-of-log-log log log - return execRaftHeartbeat(cluster, c, args[2:]) - case "load-snapshot": - // execRaftLoadSnapshot load snapshot from leader - // command line: raft load-snapshot leaderId snapshot(see raft.makeSnapshot) - return execRaftLoadSnapshot(cluster, c, args[2:]) - case "propose": - // execRaftPropose handles event proposal as leader - // command line: raft propose - return execRaftPropose(cluster, c, args[2:]) - case "join": - // execRaftJoin handles requests from a new node to join raft group as leader - // command line: raft join
- return execRaftJoin(cluster, c, args[2:]) - case "get-leader": - // execRaftGetLeader returns leader id and address - return execRaftGetLeader(cluster, c, args[2:]) - case "get-offset": - // execRaftGetOffset returns log offset of current leader - return execRaftGetOffset(cluster, c, args[2:]) - } - return protocol.MakeErrReply(" ERR unknown raft sub command '" + subCmd + "'") -} - -type voteReq struct { - nodeID string - term int - lastLogIndex int - lastLogTerm int -} - -func (req *voteReq) marshal() [][]byte { - lastLogIndexBin := []byte(strconv.Itoa(req.lastLogIndex)) - lastLogTermBin := []byte(strconv.Itoa(req.lastLogTerm)) - termBin := []byte(strconv.Itoa(req.term)) - return [][]byte{ - []byte(req.nodeID), - termBin, - lastLogIndexBin, - lastLogTermBin, - } -} - -func (req *voteReq) unmarshal(bin [][]byte) error { - req.nodeID = string(bin[0]) - term, err := strconv.Atoi(string(bin[1])) - if err != nil { - return fmt.Errorf("illegal term %s", string(bin[2])) - } - req.term = term - logIndex, err := strconv.Atoi(string(bin[2])) - if err != nil { - return fmt.Errorf("illegal index %s", string(bin[1])) - } - req.lastLogIndex = logIndex - logTerm, err := strconv.Atoi(string(bin[3])) - if err != nil { - return fmt.Errorf("illegal index %s", string(bin[1])) - } - req.lastLogTerm = logTerm - return nil -} - -type voteResp struct { - voteFor string - term int -} - -func (resp *voteResp) unmarshal(bin [][]byte) error { - if len(bin) != 2 { - return errors.New("illegal vote resp length") - } - resp.voteFor = string(bin[0]) - term, err := strconv.Atoi(string(bin[1])) - if err != nil { - return fmt.Errorf("illegal term: %s", string(bin[1])) - } - resp.term = term - return nil -} - -func (resp *voteResp) marshal() [][]byte { - return [][]byte{ - []byte(resp.voteFor), - []byte(strconv.Itoa(resp.term)), - } -} - -// execRaftRequestVote command line: raft request-vote nodeID index term -// Decide whether to vote when other nodes solicit votes -func execRaftRequestVote(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { - if len(args) != 4 { - return protocol.MakeArgNumErrReply("raft request-vote") - } - req := &voteReq{} - err := req.unmarshal(args) - if err != nil { - return protocol.MakeErrReply(err.Error()) - } - raft := cluster.asRaft() - raft.mu.Lock() - defer raft.mu.Unlock() - logger.Info("recv request vote from " + req.nodeID + ", term: " + strconv.Itoa(req.term)) - resp := &voteResp{} - if req.term < raft.term { - resp.term = raft.term - resp.voteFor = raft.leaderId // tell candidate the new leader - logger.Info("deny request vote from " + req.nodeID + " for earlier term") - return protocol.MakeMultiBulkReply(resp.marshal()) - } - // todo: if req.term > raft.term step down as leader? - lastLogTerm, lastLogIndex := raft.getLogProgressWithinLock() - if compareLogIndex(req.lastLogTerm, req.lastLogIndex, lastLogTerm, lastLogIndex) < 0 { - resp.term = raft.term - resp.voteFor = raft.votedFor - logger.Info("deny request vote from " + req.nodeID + " for log progress") - logger.Info("request vote proposal index " + strconv.Itoa(req.lastLogIndex) + " self index " + strconv.Itoa(raft.proposedIndex)) - return protocol.MakeMultiBulkReply(resp.marshal()) - } - if raft.votedFor != "" && raft.votedFor != raft.selfNodeID { - resp.term = raft.term - resp.voteFor = raft.votedFor - logger.Info("deny request vote from " + req.nodeID + " for voted") - return protocol.MakeMultiBulkReply(resp.marshal()) - } - if raft.votedFor == raft.selfNodeID && - raft.voteCount == 1 { - // cancel vote for self to avoid live lock - raft.votedFor = "" - raft.voteCount = 0 - } - logger.Info("accept request vote from " + req.nodeID) - raft.votedFor = req.nodeID - raft.term = req.term - raft.electionAlarm = nextElectionAlarm() - resp.voteFor = req.nodeID - resp.term = raft.term - return protocol.MakeMultiBulkReply(resp.marshal()) -} - -type heartbeatRequest struct { - leaderId string - term int - commitTo int - prevLogTerm int - prevLogIndex int - entries []*logEntry -} - -func (req *heartbeatRequest) marshal() [][]byte { - cmdLine := utils.ToCmdLine( - req.leaderId, - strconv.Itoa(req.term), - strconv.Itoa(req.commitTo), - ) - if len(req.entries) > 0 { - cmdLine = append(cmdLine, - []byte(strconv.Itoa(req.prevLogTerm)), - []byte(strconv.Itoa(req.prevLogIndex)), - ) - for _, entry := range req.entries { - cmdLine = append(cmdLine, entry.marshal()) - } - } - return cmdLine -} - -func (req *heartbeatRequest) unmarshal(args [][]byte) protocol.ErrorReply { - if len(args) < 6 && len(args) != 3 { - return protocol.MakeArgNumErrReply("raft heartbeat") - } - req.leaderId = string(args[0]) - var err error - req.term, err = strconv.Atoi(string(args[1])) - if err != nil { - return protocol.MakeErrReply("illegal term: " + string(args[1])) - } - req.commitTo, err = strconv.Atoi(string(args[2])) - if err != nil { - return protocol.MakeErrReply("illegal commitTo: " + string(args[2])) - } - if len(args) > 3 { - req.prevLogTerm, err = strconv.Atoi(string(args[3])) - if err != nil { - return protocol.MakeErrReply("illegal commitTo: " + string(args[3])) - } - req.prevLogIndex, err = strconv.Atoi(string(args[4])) - if err != nil { - return protocol.MakeErrReply("illegal commitTo: " + string(args[4])) - } - for _, bin := range args[5:] { - entry := &logEntry{} - err = entry.unmarshal(bin) - if err != nil { - return protocol.MakeErrReply(err.Error()) - } - req.entries = append(req.entries, entry) - } - } - return nil -} - -const prevLogMismatch = "prev log mismatch" -const nodeNotReady = "not ready" - -// execRaftHeartbeat receives heartbeat from leader -// command line: raft heartbeat nodeID term commitTo prevTerm prevIndex [log entry] -// returns term and received index -func execRaftHeartbeat(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { - raft := cluster.asRaft() - req := &heartbeatRequest{} - unmarshalErr := req.unmarshal(args) - if unmarshalErr != nil { - return unmarshalErr - } - if req.term < raft.term { - return protocol.MakeMultiBulkReply(utils.ToCmdLine( - strconv.Itoa(req.term), - strconv.Itoa(raft.proposedIndex), // new received index - )) - } else if req.term > raft.term { - logger.Info("accept new leader " + req.leaderId) - raft.mu.Lock() - // todo: if current node is not at follower state - raft.term = req.term - raft.votedFor = "" - raft.leaderId = req.leaderId - raft.mu.Unlock() - } - raft.mu.RLock() - // heartbeat may arrive earlier than follower ready - if raft.heartbeatChan == nil { - raft.mu.RUnlock() - return protocol.MakeErrReply(nodeNotReady) - } - if len(req.entries) > 0 && compareLogIndex(req.prevLogTerm, req.prevLogIndex, raft.baseTerm, raft.baseIndex) != 0 { - raft.mu.RUnlock() - return protocol.MakeErrReply(prevLogMismatch) - } - raft.mu.RUnlock() - - raft.heartbeatChan <- &heartbeat{ - sender: req.leaderId, - term: req.term, - entries: req.entries, - commitTo: req.commitTo, - } - return protocol.MakeMultiBulkReply(utils.ToCmdLine( - strconv.Itoa(req.term), - strconv.Itoa(raft.proposedIndex+len(req.entries)), // new received index - )) -} - -// execRaftLoadSnapshot load snapshot from leader -// command line: raft load-snapshot leaderId snapshot(see raft.makeSnapshot) -func execRaftLoadSnapshot(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { - // leaderId snapshot - if len(args) < 5 { - return protocol.MakeArgNumErrReply("raft load snapshot") - } - raft := cluster.asRaft() - raft.mu.Lock() - defer raft.mu.Unlock() - if errReply := raft.loadSnapshot(args[1:]); errReply != nil { - return errReply - } - sender := string(args[0]) - raft.heartbeatChan <- &heartbeat{ - sender: sender, - term: raft.term, - entries: nil, - commitTo: raft.committedIndex, - } - return protocol.MakeMultiBulkReply(utils.ToCmdLine( - strconv.Itoa(raft.term), - strconv.Itoa(raft.proposedIndex), - )) -} - -var wgPool = sync.Pool{ - New: func() interface{} { - return &sync.WaitGroup{} - }, -} - -// execRaftGetLeader returns leader id and address -func execRaftGetLeader(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { - raft := cluster.asRaft() - raft.mu.RLock() - leaderNode := raft.nodes[raft.leaderId] - raft.mu.RUnlock() - return protocol.MakeMultiBulkReply(utils.ToCmdLine( - leaderNode.ID, - leaderNode.Addr, - )) -} - -// execRaftGetOffset returns log offset of current leader -func execRaftGetOffset(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { - raft := cluster.asRaft() - raft.mu.RLock() - proposalIndex := raft.proposedIndex - //committedIndex := raft.committedIndex - raft.mu.RUnlock() - return protocol.MakeIntReply(int64(proposalIndex)) -} - -// invoker should provide with raft.mu lock -func (raft *Raft) persist() error { - if raft.persistFile == "" { - return nil - } - tmpFile, err := os.CreateTemp(config.Properties.Dir, "tmp-cluster-conf-*.conf") - if err != nil { - return err - } - snapshot := raft.makeSnapshot() - buf := bytes.NewBuffer(nil) - for _, line := range snapshot { - buf.Write(line) - buf.WriteByte('\n') - } - _, err = tmpFile.Write(buf.Bytes()) - if err != nil { - return err - } - err = os.Rename(tmpFile.Name(), raft.persistFile) - if err != nil { - return err - } - return nil -} - -// execRaftPropose handles requests from other nodes (follower or learner) to propose a change -// command line: raft propose -func execRaftPropose(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { - raft := cluster.asRaft() - if raft.state != leader { - leaderNode := raft.nodes[raft.leaderId] - return protocol.MakeErrReply("NOT LEADER " + leaderNode.ID + " " + leaderNode.Addr) - } - if len(args) != 1 { - return protocol.MakeArgNumErrReply("raft propose") - } - - e := &logEntry{} - err := e.unmarshal(args[0]) - if err != nil { - return protocol.MakeErrReply(err.Error()) - } - if errReply := raft.propose(e); errReply != nil { - return errReply - } - return protocol.MakeOkReply() -} - -func (raft *Raft) propose(e *logEntry) protocol.ErrorReply { - switch e.Event { - case eventNewNode: - raft.mu.Lock() - _, ok := raft.nodes[e.Addr] - raft.mu.Unlock() - if ok { - return protocol.MakeErrReply("node exists") - } - } - wg := wgPool.Get().(*sync.WaitGroup) - defer wgPool.Put(wg) - e.wg = wg - raft.mu.Lock() - raft.proposedIndex++ - raft.log = append(raft.log, e) - raft.nodeIndexMap[raft.selfNodeID].receivedIndex = raft.proposedIndex - e.Term = raft.term - e.Index = raft.proposedIndex - raft.mu.Unlock() - e.wg.Add(1) - e.wg.Wait() // wait for the raft group to reach a consensus - return nil -} - -func (raft *Raft) Join(seed string) protocol.ErrorReply { - cluster := raft.cluster - - /* STEP1: get leader from seed */ - seedCli, err := cluster.clientFactory.GetPeerClient(seed) - if err != nil { - return protocol.MakeErrReply("connect with seed failed: " + err.Error()) - } - defer cluster.clientFactory.ReturnPeerClient(seed, seedCli) - ret := seedCli.Send(utils.ToCmdLine("raft", "get-leader")) - if protocol.IsErrorReply(ret) { - return ret.(protocol.ErrorReply) - } - leaderInfo, ok := ret.(*protocol.MultiBulkReply) - if !ok || len(leaderInfo.Args) != 2 { - return protocol.MakeErrReply("ERR get-leader returns wrong reply") - } - leaderAddr := string(leaderInfo.Args[1]) - - /* STEP2: join raft group */ - leaderCli, err := cluster.clientFactory.GetPeerClient(leaderAddr) - if err != nil { - return protocol.MakeErrReply("connect with seed failed: " + err.Error()) - } - defer cluster.clientFactory.ReturnPeerClient(leaderAddr, leaderCli) - ret = leaderCli.Send(utils.ToCmdLine("raft", "join", cluster.addr)) - if protocol.IsErrorReply(ret) { - return ret.(protocol.ErrorReply) - } - snapshot, ok := ret.(*protocol.MultiBulkReply) - if !ok || len(snapshot.Args) < 4 { - return protocol.MakeErrReply("ERR gcluster join returns wrong reply") - } - raft.mu.Lock() - defer raft.mu.Unlock() - if errReply := raft.loadSnapshot(snapshot.Args); errReply != nil { - return errReply - } - cluster.self = raft.selfNodeID - raft.start(follower) - return nil -} - -func (raft *Raft) LoadConfigFile() protocol.ErrorReply { - f, err := os.Open(raft.persistFile) - if err == os.ErrNotExist { - return errConfigFileNotExist - } - defer func() { - if err := f.Close(); err != nil { - logger.Errorf("close cloud config file error: %v", err) - } - }() - scanner := bufio.NewScanner(f) - var snapshot [][]byte - for scanner.Scan() { - line := append([]byte{}, scanner.Bytes()...) // copy the line... - snapshot = append(snapshot, line) - } - raft.mu.Lock() - defer raft.mu.Unlock() - if errReply := raft.loadSnapshot(snapshot); errReply != nil { - return errReply - } - raft.cluster.self = raft.selfNodeID - raft.start(raft.state) - return nil -} diff --git a/cluster/raft/raft.go b/cluster/raft/raft.go index 194d2e0..5dc4925 100644 --- a/cluster/raft/raft.go +++ b/cluster/raft/raft.go @@ -187,3 +187,8 @@ func (node *Node) Propose(event *LogEntry) (uint64, error) { } return future.Index(), nil } + +func (node *Node) Close() error { + future := node.inner.Shutdown() + return fmt.Errorf("raft shutdown %v", future.Error()) +} \ No newline at end of file diff --git a/cluster/raft_event.go b/cluster/raft_event.go deleted file mode 100644 index ffa4a76..0000000 --- a/cluster/raft_event.go +++ /dev/null @@ -1,134 +0,0 @@ -package cluster - -// raft event handlers - -import ( - "errors" - "github.com/hdt3213/godis/interface/redis" - "github.com/hdt3213/godis/lib/logger" - "github.com/hdt3213/godis/lib/utils" - "github.com/hdt3213/godis/redis/connection" - "github.com/hdt3213/godis/redis/protocol" -) - -const ( - eventNewNode = iota + 1 - eventSetSlot -) - -// invoker should provide with raft.mu lock -func (raft *Raft) applyLogEntries(entries []*logEntry) { - for _, entry := range entries { - switch entry.Event { - case eventNewNode: - node := &Node{ - ID: entry.NodeID, - Addr: entry.Addr, - } - raft.nodes[node.ID] = node - if raft.state == leader { - raft.nodeIndexMap[entry.NodeID] = &nodeStatus{ - receivedIndex: entry.Index, // the new node should not receive its own join event - } - } - case eventSetSlot: - for _, slotID := range entry.SlotIDs { - slot := raft.slots[slotID] - oldNode := raft.nodes[slot.NodeID] - // remove from old oldNode - for i, s := range oldNode.Slots { - if s.ID == slot.ID { - copy(oldNode.Slots[i:], oldNode.Slots[i+1:]) - oldNode.Slots = oldNode.Slots[:len(oldNode.Slots)-1] - break - } - } - newNodeID := entry.NodeID - slot.NodeID = newNodeID - // fixme: 多个节点同时加入后 re balance 时 newNode 可能为 nil - newNode := raft.nodes[slot.NodeID] - newNode.Slots = append(newNode.Slots, slot) - } - } - } - if err := raft.persist(); err != nil { - logger.Errorf("persist raft error: %v", err) - } - -} - -// NewNode creates a new Node when a node request self node for joining cluster -func (raft *Raft) NewNode(addr string) (*Node, error) { - if _, ok := raft.nodes[addr]; ok { - return nil, errors.New("node existed") - } - node := &Node{ - ID: addr, - Addr: addr, - } - raft.nodes[node.ID] = node - proposal := &logEntry{ - Event: eventNewNode, - NodeID: node.ID, - Addr: node.Addr, - } - conn := connection.NewFakeConn() - resp := raft.cluster.relay(raft.leaderId, conn, - utils.ToCmdLine("raft", "propose", string(proposal.marshal()))) - if err, ok := resp.(protocol.ErrorReply); ok { - return nil, err - } - return node, nil -} - -// SetSlot propose -func (raft *Raft) SetSlot(slotIDs []uint32, newNodeID string) protocol.ErrorReply { - proposal := &logEntry{ - Event: eventSetSlot, - NodeID: newNodeID, - SlotIDs: slotIDs, - } - conn := connection.NewFakeConn() - resp := raft.cluster.relay(raft.leaderId, conn, - utils.ToCmdLine("raft", "propose", string(proposal.marshal()))) - if err, ok := resp.(protocol.ErrorReply); ok { - return err - } - return nil -} - -// execRaftJoin handles requests from a new node to join raft group, current node should be leader -// command line: raft join addr -func execRaftJoin(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { - if len(args) != 1 { - return protocol.MakeArgNumErrReply("raft join") - } - raft := cluster.asRaft() - if raft.state != leader { - leaderNode := raft.nodes[raft.leaderId] - return protocol.MakeErrReply("NOT LEADER " + leaderNode.ID + " " + leaderNode.Addr) - } - addr := string(args[0]) - nodeID := addr - - raft.mu.RLock() - _, exist := raft.nodes[addr] - raft.mu.RUnlock() - // if node has joint cluster but terminated before persisting cluster config, - // it may try to join at next start. - // In this case, we only have to send a snapshot for it - if !exist { - proposal := &logEntry{ - Event: eventNewNode, - NodeID: nodeID, - Addr: addr, - } - if err := raft.propose(proposal); err != nil { - return err - } - } - raft.mu.RLock() - snapshot := raft.makeSnapshotForFollower(nodeID) - raft.mu.RUnlock() - return protocol.MakeMultiBulkReply(snapshot) -} diff --git a/cluster/raft_snapshot.go b/cluster/raft_snapshot.go deleted file mode 100644 index b97145c..0000000 --- a/cluster/raft_snapshot.go +++ /dev/null @@ -1,204 +0,0 @@ -package cluster - -import ( - "encoding/json" - "fmt" - "github.com/hdt3213/godis/redis/protocol" - "sort" - "strconv" - "strings" -) - -// marshalSlotIds serializes slot ids -// For example, 1, 2, 3, 5, 7, 8 -> 1-3, 5, 7-8 -func marshalSlotIds(slots []*Slot) []string { - sort.Slice(slots, func(i, j int) bool { - return slots[i].ID < slots[j].ID - }) - // find continuous scopes - var scopes [][]uint32 - buf := make([]uint32, 2) - var scope []uint32 - for i, slot := range slots { - if len(scope) == 0 { // outside scope - if i+1 < len(slots) && - slots[i+1].ID == slot.ID+1 { // if continuous, then start one - scope = buf - scope[0] = slot.ID - } else { // discrete number - scopes = append(scopes, []uint32{slot.ID}) - } - } else { // within a scope - if i == len(slots)-1 || slots[i+1].ID != slot.ID+1 { // reach end or not continuous, stop current scope - scope[1] = slot.ID - scopes = append(scopes, []uint32{scope[0], scope[1]}) - scope = nil - } - } - - } - - // marshal scopes - result := make([]string, 0, len(scopes)) - for _, scope := range scopes { - if len(scope) == 1 { - s := strconv.Itoa(int(scope[0])) - result = append(result, s) - } else { // assert len(scope) == 2 - beg := strconv.Itoa(int(scope[0])) - end := strconv.Itoa(int(scope[1])) - result = append(result, beg+"-"+end) - } - } - return result -} - -// unmarshalSlotIds deserializes lines generated by marshalSlotIds -func unmarshalSlotIds(args []string) ([]uint32, error) { - var result []uint32 - for i, line := range args { - if pivot := strings.IndexByte(line, '-'); pivot > 0 { - // line is a scope - beg, err := strconv.Atoi(line[:pivot]) - if err != nil { - return nil, fmt.Errorf("illegal at slot line %d", i+1) - } - end, err := strconv.Atoi(line[pivot+1:]) - if err != nil { - return nil, fmt.Errorf("illegal at slot line %d", i+1) - } - for j := beg; j <= end; j++ { - result = append(result, uint32(j)) - } - } else { - // line is a number - v, err := strconv.Atoi(line) - if err != nil { - return nil, fmt.Errorf("illegal at slot line %d", i) - } - result = append(result, uint32(v)) - } - } - return result, nil -} - -type nodePayload struct { - ID string `json:"id"` - Addr string `json:"addr"` - SlotDesc []string `json:"slotDesc"` - Flags uint32 `json:"flags"` -} - -func marshalNodes(nodes map[string]*Node) [][]byte { - var args [][]byte - for _, node := range nodes { - slotLines := marshalSlotIds(node.Slots) - payload := &nodePayload{ - ID: node.ID, - Addr: node.Addr, - SlotDesc: slotLines, - Flags: node.Flags, - } - bin, _ := json.Marshal(payload) - args = append(args, bin) - } - return args -} - -func unmarshalNodes(args [][]byte) (map[string]*Node, error) { - nodeMap := make(map[string]*Node) - for i, bin := range args { - payload := &nodePayload{} - err := json.Unmarshal(bin, payload) - if err != nil { - return nil, fmt.Errorf("unmarshal node failed at line %d: %v", i+1, err) - } - slotIds, err := unmarshalSlotIds(payload.SlotDesc) - if err != nil { - return nil, err - } - node := &Node{ - ID: payload.ID, - Addr: payload.Addr, - Flags: payload.Flags, - } - for _, slotId := range slotIds { - node.Slots = append(node.Slots, &Slot{ - ID: slotId, - NodeID: node.ID, - Flags: 0, - }) - } - nodeMap[node.ID] = node - } - return nodeMap, nil -} - -// genSnapshot -// invoker provide lock -func (raft *Raft) makeSnapshot() [][]byte { - topology := marshalNodes(raft.nodes) - snapshot := [][]byte{ - []byte(raft.selfNodeID), - []byte(strconv.Itoa(int(raft.state))), - []byte(raft.leaderId), - []byte(strconv.Itoa(raft.term)), - []byte(strconv.Itoa(raft.committedIndex)), - } - snapshot = append(snapshot, topology...) - return snapshot -} - -// makeSnapshotForFollower used by leader node to generate snapshot for follower -// invoker provide with lock -func (raft *Raft) makeSnapshotForFollower(followerId string) [][]byte { - snapshot := raft.makeSnapshot() - snapshot[0] = []byte(followerId) - snapshot[1] = []byte(strconv.Itoa(int(follower))) - return snapshot -} - -// invoker provide with lock -func (raft *Raft) loadSnapshot(snapshot [][]byte) protocol.ErrorReply { - // make sure raft.slots and node.Slots is the same object - selfNodeId := string(snapshot[0]) - state0, err := strconv.Atoi(string(snapshot[1])) - if err != nil { - return protocol.MakeErrReply("illegal state: " + string(snapshot[1])) - } - state := raftState(state0) - if _, ok := stateNames[state]; !ok { - return protocol.MakeErrReply("unknown state: " + strconv.Itoa(int(state))) - } - leaderId := string(snapshot[2]) - term, err := strconv.Atoi(string(snapshot[3])) - if err != nil { - return protocol.MakeErrReply("illegal term: " + string(snapshot[3])) - } - commitIndex, err := strconv.Atoi(string(snapshot[4])) - if err != nil { - return protocol.MakeErrReply("illegal commit index: " + string(snapshot[3])) - } - nodes, err := unmarshalNodes(snapshot[5:]) - if err != nil { - return protocol.MakeErrReply(err.Error()) - } - raft.selfNodeID = selfNodeId - raft.state = state - raft.leaderId = leaderId - raft.term = term - raft.committedIndex = commitIndex - raft.proposedIndex = commitIndex - raft.initLog(term, commitIndex, nil) - raft.slots = make([]*Slot, slotCount) - for _, node := range nodes { - for _, slot := range node.Slots { - raft.slots[int(slot.ID)] = slot - } - if node.getState() == leader { - raft.leaderId = node.ID - } - } - raft.nodes = nodes - return nil -} diff --git a/cluster/raft_snapshot_test.go b/cluster/raft_snapshot_test.go deleted file mode 100644 index 916b1b5..0000000 --- a/cluster/raft_snapshot_test.go +++ /dev/null @@ -1 +0,0 @@ -package cluster diff --git a/cluster/rename.go b/cluster/rename.go deleted file mode 100644 index b663867..0000000 --- a/cluster/rename.go +++ /dev/null @@ -1,140 +0,0 @@ -package cluster - -import ( - "github.com/hdt3213/godis/interface/redis" - "github.com/hdt3213/godis/lib/utils" - "github.com/hdt3213/godis/redis/protocol" - "strconv" -) - -// Rename renames a key, the origin and the destination must within the same node -func Rename(cluster *Cluster, c redis.Connection, cmdLine [][]byte) redis.Reply { - if len(cmdLine) != 3 { - return protocol.MakeErrReply("ERR wrong number of arguments for 'rename' command") - } - srcKey := string(cmdLine[1]) - destKey := string(cmdLine[2]) - srcNode := cluster.pickNodeAddrByKey(srcKey) - destNode := cluster.pickNodeAddrByKey(destKey) - if srcNode == destNode { // do fast - return cluster.relay(srcNode, c, modifyCmd(cmdLine, "Rename_")) - } - groupMap := map[string][]string{ - srcNode: {srcKey}, - destNode: {destKey}, - } - txID := cluster.idGenerator.NextID() - txIDStr := strconv.FormatInt(txID, 10) - // prepare rename from - srcPrepareResp := cluster.relay(srcNode, c, makeArgs("Prepare", txIDStr, "RenameFrom", srcKey)) - if protocol.IsErrorReply(srcPrepareResp) { - // rollback src node - requestRollback(cluster, c, txID, map[string][]string{srcNode: {srcKey}}) - return srcPrepareResp - } - srcPrepareMBR, ok := srcPrepareResp.(*protocol.MultiBulkReply) - if !ok || len(srcPrepareMBR.Args) < 2 { - requestRollback(cluster, c, txID, map[string][]string{srcNode: {srcKey}}) - return protocol.MakeErrReply("ERR invalid prepare response") - } - // prepare rename to - destPrepareResp := cluster.relay(destNode, c, utils.ToCmdLine3("Prepare", []byte(txIDStr), - []byte("RenameTo"), []byte(destKey), srcPrepareMBR.Args[0], srcPrepareMBR.Args[1])) - if protocol.IsErrorReply(destPrepareResp) { - // rollback src node - requestRollback(cluster, c, txID, groupMap) - return destPrepareResp - } - if _, errReply := requestCommit(cluster, c, txID, groupMap); errReply != nil { - requestRollback(cluster, c, txID, groupMap) - return errReply - } - return protocol.MakeOkReply() -} - -// prepareRenameFrom is prepare-function for RenameFrom, see prepareFuncMap -func prepareRenameFrom(cluster *Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply { - if len(cmdLine) != 2 { - return protocol.MakeArgNumErrReply("RenameFrom") - } - key := string(cmdLine[1]) - existResp := cluster.db.ExecWithLock(conn, utils.ToCmdLine("Exists", key)) - if protocol.IsErrorReply(existResp) { - return existResp - } - existIntResp := existResp.(*protocol.IntReply) - if existIntResp.Code == 0 { - return protocol.MakeErrReply("ERR no such key") - } - return cluster.db.ExecWithLock(conn, utils.ToCmdLine2("DumpKey", key)) -} - -func prepareRenameNxTo(cluster *Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply { - if len(cmdLine) != 4 { - return protocol.MakeArgNumErrReply("RenameNxTo") - } - key := string(cmdLine[1]) - existResp := cluster.db.ExecWithLock(conn, utils.ToCmdLine("Exists", key)) - if protocol.IsErrorReply(existResp) { - return existResp - } - existIntResp := existResp.(*protocol.IntReply) - if existIntResp.Code == 1 { - return protocol.MakeErrReply(keyExistsErr) - } - return protocol.MakeOkReply() -} - -func init() { - registerPrepareFunc("RenameFrom", prepareRenameFrom) - registerPrepareFunc("RenameNxTo", prepareRenameNxTo) -} - -// RenameNx renames a key, only if the new key does not exist. -// The origin and the destination must within the same node -func RenameNx(cluster *Cluster, c redis.Connection, cmdLine [][]byte) redis.Reply { - if len(cmdLine) != 3 { - return protocol.MakeErrReply("ERR wrong number of arguments for 'renamenx' command") - } - srcKey := string(cmdLine[1]) - destKey := string(cmdLine[2]) - srcNode := cluster.pickNodeAddrByKey(srcKey) - destNode := cluster.pickNodeAddrByKey(destKey) - if srcNode == destNode { - return cluster.relay(srcNode, c, modifyCmd(cmdLine, "RenameNX_")) - } - groupMap := map[string][]string{ - srcNode: {srcKey}, - destNode: {destKey}, - } - txID := cluster.idGenerator.NextID() - txIDStr := strconv.FormatInt(txID, 10) - // prepare rename from - srcPrepareResp := cluster.relay(srcNode, c, makeArgs("Prepare", txIDStr, "RenameFrom", srcKey)) - if protocol.IsErrorReply(srcPrepareResp) { - // rollback src node - requestRollback(cluster, c, txID, map[string][]string{srcNode: {srcKey}}) - return srcPrepareResp - } - srcPrepareMBR, ok := srcPrepareResp.(*protocol.MultiBulkReply) - if !ok || len(srcPrepareMBR.Args) < 2 { - requestRollback(cluster, c, txID, map[string][]string{srcNode: {srcKey}}) - return protocol.MakeErrReply("ERR invalid prepare response") - } - // prepare rename to - destPrepareResp := cluster.relay(destNode, c, utils.ToCmdLine3("Prepare", []byte(txIDStr), - []byte("RenameNxTo"), []byte(destKey), srcPrepareMBR.Args[0], srcPrepareMBR.Args[1])) - if protocol.IsErrorReply(destPrepareResp) { - // rollback src node - requestRollback(cluster, c, txID, groupMap) - if re := destPrepareResp.(protocol.ErrorReply); re.Error() == keyExistsErr { - return protocol.MakeIntReply(0) - } - return destPrepareResp - } - if _, errReply := requestCommit(cluster, c, txID, groupMap); errReply != nil { - requestRollback(cluster, c, txID, groupMap) - return errReply - } - return protocol.MakeIntReply(1) -} diff --git a/cluster/rename_test.go b/cluster/rename_test.go deleted file mode 100644 index 966a939..0000000 --- a/cluster/rename_test.go +++ /dev/null @@ -1,59 +0,0 @@ -package cluster - -import ( - "github.com/hdt3213/godis/lib/utils" - "github.com/hdt3213/godis/redis/connection" - "github.com/hdt3213/godis/redis/protocol/asserts" - "testing" -) - -func TestRename(t *testing.T) { - testNodeA := testCluster[0] - conn := new(connection.FakeConn) - - // cross node rename - for i := 0; i < 10; i++ { - testNodeA.Exec(conn, utils.ToCmdLine("FlushALL")) - key := utils.RandString(10) - value := utils.RandString(10) - newKey := utils.RandString(10) - testNodeA.Exec(conn, utils.ToCmdLine("SET", key, value, "ex", "100000")) - result := testNodeA.Exec(conn, utils.ToCmdLine("RENAME", key, newKey)) - asserts.AssertStatusReply(t, result, "OK") - result = testNodeA.Exec(conn, utils.ToCmdLine("EXISTS", key)) - asserts.AssertIntReply(t, result, 0) - result = testNodeA.Exec(conn, utils.ToCmdLine("EXISTS", newKey)) - asserts.AssertIntReply(t, result, 1) - result = testNodeA.Exec(conn, utils.ToCmdLine("TTL", newKey)) - asserts.AssertIntReplyGreaterThan(t, result, 0) - } -} - -func TestRenameNx(t *testing.T) { - testNodeA := testCluster[0] - conn := new(connection.FakeConn) - - // cross node rename - for i := 0; i < 10; i++ { - testNodeA.Exec(conn, utils.ToCmdLine("FlushALL")) - key := utils.RandString(10) - value := utils.RandString(10) - newKey := utils.RandString(10) - testNodeA.Exec(conn, utils.ToCmdLine("SET", key, value, "ex", "100000")) - result := testNodeA.Exec(conn, utils.ToCmdLine("RENAMENX", key, newKey)) - asserts.AssertIntReply(t, result, 1) - result = testNodeA.Exec(conn, utils.ToCmdLine("EXISTS", key)) - asserts.AssertIntReply(t, result, 0) - result = testNodeA.Exec(conn, utils.ToCmdLine("EXISTS", newKey)) - asserts.AssertIntReply(t, result, 1) - result = testNodeA.Exec(conn, utils.ToCmdLine("TTL", newKey)) - asserts.AssertIntReplyGreaterThan(t, result, 0) - - value2 := value + "ccc" - testNodeA.Exec(conn, utils.ToCmdLine("SET", key, value2, "ex", "100000")) - result = testNodeA.Exec(conn, utils.ToCmdLine("RENAMENX", key, newKey)) - asserts.AssertIntReply(t, result, 0) - result = testNodeA.Exec(conn, utils.ToCmdLine("EXISTS", key)) - asserts.AssertIntReply(t, result, 1) - } -} diff --git a/cluster/router.go b/cluster/router.go deleted file mode 100644 index e8778be..0000000 --- a/cluster/router.go +++ /dev/null @@ -1,166 +0,0 @@ -package cluster - -import ( - "github.com/hdt3213/godis/interface/redis" - "strings" -) - -// CmdLine is alias for [][]byte, represents a command line -type CmdLine = [][]byte - -var router = make(map[string]CmdFunc) - -func registerCmd(name string, cmd CmdFunc) { - name = strings.ToLower(name) - router[name] = cmd -} - -func registerDefaultCmd(name string) { - registerCmd(name, defaultFunc) -} - -// relay command to responsible peer, and return its protocol to client -func defaultFunc(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { - key := string(args[1]) - slotId := getSlot(key) - peer := cluster.pickNode(slotId) - if peer.ID == cluster.self { - err := cluster.ensureKeyWithoutLock(key) - if err != nil { - return err - } - // to self db - //return cluster.db.Exec(c, cmdLine) - return cluster.db.Exec(c, args) - } - return cluster.relay(peer.ID, c, args) -} - -func init() { - registerCmd("Ping", ping) - registerCmd("Prepare", execPrepare) - registerCmd("Commit", execCommit) - registerCmd("Rollback", execRollback) - registerCmd("Del", Del) - registerCmd("Rename", Rename) - registerCmd("RenameNx", RenameNx) - registerCmd("Copy", Copy) - registerCmd("MSet", MSet) - registerCmd("MGet", MGet) - registerCmd("MSetNx", MSetNX) - registerCmd("Publish", Publish) - registerCmd("Subscribe", Subscribe) - registerCmd("Unsubscribe", UnSubscribe) - registerCmd("FlushDB", FlushDB) - registerCmd("FlushAll", FlushAll) - registerCmd(relayMulti, execRelayedMulti) - registerCmd("Watch", execWatch) - registerCmd("FlushDB_", genPenetratingExecutor("FlushDB")) - registerCmd("Copy_", genPenetratingExecutor("Copy")) - registerCmd("Watch_", genPenetratingExecutor("Watch")) - registerCmd(relayPublish, genPenetratingExecutor("Publish")) - registerCmd("Del_", genPenetratingExecutor("Del")) - registerCmd("MSet_", genPenetratingExecutor("MSet")) - registerCmd("MSetNx_", genPenetratingExecutor("MSetNx")) - registerCmd("MGet_", genPenetratingExecutor("MGet")) - registerCmd("Rename_", genPenetratingExecutor("Rename")) - registerCmd("RenameNx_", genPenetratingExecutor("RenameNx")) - registerCmd("DumpKey_", genPenetratingExecutor("DumpKey")) - - defaultCmds := []string{ - "expire", - "expireAt", - "pExpire", - "pExpireAt", - "ttl", - "PTtl", - "persist", - "exists", - "type", - "set", - "setNx", - "setEx", - "pSetEx", - "get", - "getEx", - "getSet", - "getDel", - "incr", - "incrBy", - "incrByFloat", - "decr", - "decrBy", - "lPush", - "lPushX", - "rPush", - "rPushX", - "LPop", - "RPop", - "LRem", - "LLen", - "LIndex", - "LSet", - "LRange", - "HSet", - "HSetNx", - "HGet", - "HExists", - "HDel", - "HLen", - "HStrLen", - "HMGet", - "HMSet", - "HKeys", - "HVals", - "HGetAll", - "HIncrBy", - "HIncrByFloat", - "HRandField", - "SAdd", - "SIsMember", - "SRem", - "SPop", - "SCard", - "SMembers", - "SInter", - "SInterStore", - "SUnion", - "SUnionStore", - "SDiff", - "SDiffStore", - "SRandMember", - "ZAdd", - "ZScore", - "ZIncrBy", - "ZRank", - "ZCount", - "ZRevRank", - "ZCard", - "ZRange", - "ZRevRange", - "ZRangeByScore", - "ZRevRangeByScore", - "ZRem", - "ZRemRangeByScore", - "ZRemRangeByRank", - "GeoAdd", - "GeoPos", - "GeoDist", - "GeoHash", - "GeoRadius", - "GeoRadiusByMember", - "GetVer", - "DumpKey", - } - for _, name := range defaultCmds { - registerDefaultCmd(name) - } - -} - -// genPenetratingExecutor generates an executor that can reach directly to the database layer -func genPenetratingExecutor(realCmd string) CmdFunc { - return func(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { - return cluster.db.Exec(c, modifyCmd(cmdLine, realCmd)) - } -} diff --git a/cluster/tcc.go b/cluster/tcc.go deleted file mode 100644 index b4ffe7c..0000000 --- a/cluster/tcc.go +++ /dev/null @@ -1,253 +0,0 @@ -package cluster - -import ( - "fmt" - "github.com/hdt3213/godis/database" - "github.com/hdt3213/godis/interface/redis" - "github.com/hdt3213/godis/lib/logger" - "github.com/hdt3213/godis/lib/timewheel" - "github.com/hdt3213/godis/redis/protocol" - "strconv" - "strings" - "sync" - "time" -) - -// prepareFunc executed after related key locked, and use additional logic to determine whether the transaction can be committed -// For example, prepareMSetNX will return error to prevent MSetNx transaction from committing if any related key already exists -var prepareFuncMap = make(map[string]CmdFunc) - -func registerPrepareFunc(cmdName string, fn CmdFunc) { - prepareFuncMap[strings.ToLower(cmdName)] = fn -} - -// Transaction stores state and data for a try-commit-catch distributed transaction -type Transaction struct { - id string // transaction id - cmdLine [][]byte // cmd cmdLine - cluster *Cluster - conn redis.Connection - dbIndex int - - writeKeys []string - readKeys []string - keysLocked bool - undoLog []CmdLine - - status int8 - mu *sync.Mutex -} - -const ( - maxLockTime = 3 * time.Second - waitBeforeCleanTx = 2 * maxLockTime - - createdStatus = 0 - preparedStatus = 1 - committedStatus = 2 - rolledBackStatus = 3 -) - -func genTaskKey(txID string) string { - return "tx:" + txID -} - -// NewTransaction creates a try-commit-catch distributed transaction -func NewTransaction(cluster *Cluster, c redis.Connection, id string, cmdLine [][]byte) *Transaction { - return &Transaction{ - id: id, - cmdLine: cmdLine, - cluster: cluster, - conn: c, - dbIndex: c.GetDBIndex(), - status: createdStatus, - mu: new(sync.Mutex), - } -} - -// Reentrant -// invoker should hold tx.mu -func (tx *Transaction) lockKeys() { - if !tx.keysLocked { - tx.cluster.db.RWLocks(tx.dbIndex, tx.writeKeys, tx.readKeys) - tx.keysLocked = true - } -} - -func (tx *Transaction) unLockKeys() { - if tx.keysLocked { - tx.cluster.db.RWUnLocks(tx.dbIndex, tx.writeKeys, tx.readKeys) - tx.keysLocked = false - } -} - -// t should contain Keys and ID field -func (tx *Transaction) prepare() error { - tx.mu.Lock() - defer tx.mu.Unlock() - - tx.writeKeys, tx.readKeys = database.GetRelatedKeys(tx.cmdLine) - // lock writeKeys - tx.lockKeys() - - for _, key := range tx.writeKeys { - err := tx.cluster.ensureKey(key) - if err != nil { - return err - } - } - for _, key := range tx.readKeys { - err := tx.cluster.ensureKey(key) - if err != nil { - return err - } - } - // build undoLog - tx.undoLog = tx.cluster.db.GetUndoLogs(tx.dbIndex, tx.cmdLine) - tx.status = preparedStatus - taskKey := genTaskKey(tx.id) - timewheel.Delay(maxLockTime, taskKey, func() { - if tx.status == preparedStatus { // rollback transaction uncommitted until expire - logger.Info("abort transaction: " + tx.id) - tx.mu.Lock() - defer tx.mu.Unlock() - _ = tx.rollbackWithLock() - } - }) - return nil -} - -func (tx *Transaction) rollbackWithLock() error { - curStatus := tx.status - - if tx.status != curStatus { // ensure status not changed by other goroutine - return fmt.Errorf("tx %s status changed", tx.id) - } - if tx.status == rolledBackStatus { // no need to rollback a rolled-back transaction - return nil - } - tx.lockKeys() - for _, cmdLine := range tx.undoLog { - tx.cluster.db.ExecWithLock(tx.conn, cmdLine) - } - tx.unLockKeys() - tx.status = rolledBackStatus - return nil -} - -// cmdLine: Prepare id cmdName args... -func execPrepare(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { - if len(cmdLine) < 3 { - return protocol.MakeErrReply("ERR wrong number of arguments for 'prepare' command") - } - txID := string(cmdLine[1]) - cmdName := strings.ToLower(string(cmdLine[2])) - tx := NewTransaction(cluster, c, txID, cmdLine[2:]) - cluster.transactionMu.Lock() - cluster.transactions.Put(txID, tx) - cluster.transactionMu.Unlock() - err := tx.prepare() - if err != nil { - return protocol.MakeErrReply(err.Error()) - } - prepareFunc, ok := prepareFuncMap[cmdName] - if ok { - return prepareFunc(cluster, c, cmdLine[2:]) - } - return &protocol.OkReply{} -} - -// execRollback rollbacks local transaction -func execRollback(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { - if len(cmdLine) != 2 { - return protocol.MakeErrReply("ERR wrong number of arguments for 'rollback' command") - } - txID := string(cmdLine[1]) - cluster.transactionMu.RLock() - raw, ok := cluster.transactions.Get(txID) - cluster.transactionMu.RUnlock() - if !ok { - return protocol.MakeIntReply(0) - } - tx, _ := raw.(*Transaction) - - tx.mu.Lock() - defer tx.mu.Unlock() - err := tx.rollbackWithLock() - if err != nil { - return protocol.MakeErrReply(err.Error()) - } - // clean transaction - timewheel.Delay(waitBeforeCleanTx, "", func() { - cluster.transactionMu.Lock() - cluster.transactions.Remove(tx.id) - cluster.transactionMu.Unlock() - }) - return protocol.MakeIntReply(1) -} - -// execCommit commits local transaction as a worker when receive execCommit command from coordinator -func execCommit(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { - if len(cmdLine) != 2 { - return protocol.MakeErrReply("ERR wrong number of arguments for 'commit' command") - } - txID := string(cmdLine[1]) - cluster.transactionMu.RLock() - raw, ok := cluster.transactions.Get(txID) - cluster.transactionMu.RUnlock() - if !ok { - return protocol.MakeIntReply(0) - } - tx, _ := raw.(*Transaction) - - tx.mu.Lock() - defer tx.mu.Unlock() - - result := cluster.db.ExecWithLock(c, tx.cmdLine) - - if protocol.IsErrorReply(result) { - // failed - err2 := tx.rollbackWithLock() - return protocol.MakeErrReply(fmt.Sprintf("err occurs when rollback: %v, origin err: %s", err2, result)) - } - // after committed - tx.unLockKeys() - tx.status = committedStatus - // clean finished transaction - // do not clean immediately, in case rollback - timewheel.Delay(waitBeforeCleanTx, "", func() { - cluster.transactionMu.Lock() - cluster.transactions.Remove(tx.id) - cluster.transactionMu.Unlock() - }) - return result -} - -// requestCommit commands all node to commit transaction as coordinator -func requestCommit(cluster *Cluster, c redis.Connection, txID int64, groupMap map[string][]string) ([]redis.Reply, protocol.ErrorReply) { - var errReply protocol.ErrorReply - txIDStr := strconv.FormatInt(txID, 10) - respList := make([]redis.Reply, 0, len(groupMap)) - for node := range groupMap { - resp := cluster.relay(node, c, makeArgs("commit", txIDStr)) - if protocol.IsErrorReply(resp) { - errReply = resp.(protocol.ErrorReply) - break - } - respList = append(respList, resp) - } - if errReply != nil { - requestRollback(cluster, c, txID, groupMap) - return nil, errReply - } - return respList, nil -} - -// requestRollback requests all node rollback transaction as coordinator -// groupMap: node -> keys -func requestRollback(cluster *Cluster, c redis.Connection, txID int64, groupMap map[string][]string) { - txIDStr := strconv.FormatInt(txID, 10) - for node := range groupMap { - cluster.relay(node, c, makeArgs("rollback", txIDStr)) - } -} diff --git a/cluster/tcc_test.go b/cluster/tcc_test.go deleted file mode 100644 index 11ee3a3..0000000 --- a/cluster/tcc_test.go +++ /dev/null @@ -1,50 +0,0 @@ -package cluster - -import ( - "github.com/hdt3213/godis/redis/connection" - "github.com/hdt3213/godis/redis/protocol/asserts" - "math/rand" - "strconv" - "testing" -) - -func TestRollback(t *testing.T) { - // rollback uncommitted transaction - testNodeA := testCluster[0] - conn := new(connection.FakeConn) - FlushAll(testNodeA, conn, toArgs("FLUSHALL")) - txID := rand.Int63() - txIDStr := strconv.FormatInt(txID, 10) - keys := []string{"a", "{a}1"} - groupMap := map[string][]string{ - testNodeA.self: keys, - } - args := []string{txIDStr, "DEL"} - args = append(args, keys...) - testNodeA.db.Exec(conn, toArgs("SET", "a", "a")) - ret := execPrepare(testNodeA, conn, makeArgs("Prepare", args...)) - asserts.AssertNotError(t, ret) - requestRollback(testNodeA, conn, txID, groupMap) - ret = testNodeA.db.Exec(conn, toArgs("GET", "a")) - asserts.AssertBulkReply(t, ret, "a") - - // rollback committed transaction - FlushAll(testNodeA, conn, toArgs("FLUSHALL")) - testNodeA.db.Exec(conn, toArgs("SET", "a", "a")) - txID = rand.Int63() - txIDStr = strconv.FormatInt(txID, 10) - args = []string{txIDStr, "DEL"} - args = append(args, keys...) - ret = execPrepare(testNodeA, conn, makeArgs("Prepare", args...)) - asserts.AssertNotError(t, ret) - _, err := requestCommit(testNodeA, conn, txID, groupMap) - if err != nil { - t.Errorf("del failed %v", err) - return - } - ret = testNodeA.db.Exec(conn, toArgs("GET", "a")) // call db.Exec to skip key router - asserts.AssertNullBulk(t, ret) - requestRollback(testNodeA, conn, txID, groupMap) - ret = testNodeA.db.Exec(conn, toArgs("GET", "a")) - asserts.AssertBulkReply(t, ret, "a") -} diff --git a/cluster/topo.go b/cluster/topo.go deleted file mode 100644 index 316715f..0000000 --- a/cluster/topo.go +++ /dev/null @@ -1,189 +0,0 @@ -package cluster - -import ( - "fmt" - "github.com/hdt3213/godis/database" - "github.com/hdt3213/godis/lib/logger" - "github.com/hdt3213/godis/lib/utils" - "github.com/hdt3213/godis/redis/connection" - "github.com/hdt3213/godis/redis/protocol" - "strconv" - "time" -) - -func (cluster *Cluster) startAsSeed(listenAddr string) protocol.ErrorReply { - err := cluster.topology.StartAsSeed(listenAddr) - if err != nil { - return err - } - for i := 0; i < slotCount; i++ { - cluster.initSlot(uint32(i), slotStateHost) - } - return nil -} - -// Join send `gcluster join` to node in cluster to join -func (cluster *Cluster) Join(seed string) protocol.ErrorReply { - err := cluster.topology.Join(seed) - if err != nil { - return nil - } - /* STEP3: asynchronous migrating slots */ - go func() { - time.Sleep(time.Second) // let the cluster started - cluster.reBalance() - }() - return nil -} - -var errConfigFileNotExist = protocol.MakeErrReply("cluster config file not exist") - -// LoadConfig try to load cluster-config-file and re-join the cluster -func (cluster *Cluster) LoadConfig() protocol.ErrorReply { - err := cluster.topology.LoadConfigFile() - if err != nil { - return err - } - selfNodeId := cluster.topology.GetSelfNodeID() - selfNode := cluster.topology.GetNode(selfNodeId) - if selfNode == nil { - return protocol.MakeErrReply("ERR self node info not found") - } - for _, slot := range selfNode.Slots { - cluster.initSlot(slot.ID, slotStateHost) - } - return nil -} - -func (cluster *Cluster) reBalance() { - nodes := cluster.topology.GetNodes() - var slotIDs []uint32 - var slots []*Slot - reqDonateCmdLine := utils.ToCmdLine("gcluster", "request-donate", cluster.self) - for _, node := range nodes { - if node.ID == cluster.self { - continue - } - node := node - peerCli, err := cluster.clientFactory.GetPeerClient(node.Addr) - if err != nil { - logger.Errorf("get client of %s failed: %v", node.Addr, err) - continue - } - resp := peerCli.Send(reqDonateCmdLine) - payload, ok := resp.(*protocol.MultiBulkReply) - if !ok { - logger.Errorf("request donate to %s failed: %v", node.Addr, err) - continue - } - for _, bin := range payload.Args { - slotID64, err := strconv.ParseUint(string(bin), 10, 64) - if err != nil { - continue - } - slotID := uint32(slotID64) - slotIDs = append(slotIDs, slotID) - slots = append(slots, &Slot{ - ID: slotID, - NodeID: node.ID, - }) - // Raft cannot guarantee the simultaneity and order of submissions to the source and destination nodes - // In some cases the source node thinks the slot belongs to the destination node, and the destination node thinks the slot belongs to the source node - // To avoid it, the source node and the destination node must reach a consensus before propose to raft - cluster.setLocalSlotImporting(slotID, node.ID) - } - } - if len(slots) == 0 { - return - } - logger.Infof("received %d donated slots", len(slots)) - - // change route - err := cluster.topology.SetSlot(slotIDs, cluster.self) - if err != nil { - logger.Errorf("set slot route failed: %v", err) - return - } - slotChan := make(chan *Slot, len(slots)) - for _, slot := range slots { - slotChan <- slot - } - close(slotChan) - for i := 0; i < 4; i++ { - i := i - go func() { - for slot := range slotChan { - logger.Info("start import slot ", slot.ID) - err := cluster.importSlot(slot) - if err != nil { - logger.Error(fmt.Sprintf("import slot %d error: %v", slot.ID, err)) - // delete all imported keys in slot - cluster.cleanDroppedSlot(slot.ID) - // todo: recover route - return - } - logger.Infof("finish import slot: %d, about %d slots remains", slot.ID, len(slotChan)) - } - logger.Infof("import worker %d exited", i) - }() - } -} - -// importSlot do migrate slot into current node -// the pseudo `slot` parameter is used to store slotID and former host node -func (cluster *Cluster) importSlot(slot *Slot) error { - node := cluster.topology.GetNode(slot.NodeID) - - /* get migrate stream */ - migrateCmdLine := utils.ToCmdLine( - "gcluster", "migrate", strconv.Itoa(int(slot.ID))) - migrateStream, err := cluster.clientFactory.NewStream(node.Addr, migrateCmdLine) - if err != nil { - return err - } - defer migrateStream.Close() - - fakeConn := connection.NewFakeConn() -slotLoop: - for proto := range migrateStream.Stream() { - if proto.Err != nil { - return fmt.Errorf("set slot %d error: %v", slot.ID, err) - } - switch reply := proto.Data.(type) { - case *protocol.MultiBulkReply: - // todo: handle exec error - keys, _ := database.GetRelatedKeys(reply.Args) - // assert len(keys) == 1 - key := keys[0] - // key may be imported by Cluster.ensureKey or by former failed migrating try - if !cluster.isImportedKey(key) { - cluster.setImportedKey(key) - _ = cluster.db.Exec(fakeConn, reply.Args) - } - case *protocol.StatusReply: - if protocol.IsOKReply(reply) { - break slotLoop - } else { - // todo: return slot to former host node - msg := fmt.Sprintf("migrate slot %d error: %s", slot.ID, reply.Status) - logger.Errorf(msg) - return protocol.MakeErrReply(msg) - } - case protocol.ErrorReply: - // todo: return slot to former host node - msg := fmt.Sprintf("migrate slot %d error: %s", slot.ID, reply.Error()) - logger.Errorf(msg) - return protocol.MakeErrReply(msg) - } - } - cluster.finishSlotImport(slot.ID) - - // finish migration mode - peerCli, err := cluster.clientFactory.GetPeerClient(node.Addr) - if err != nil { - return err - } - defer cluster.clientFactory.ReturnPeerClient(node.Addr, peerCli) - peerCli.Send(utils.ToCmdLine("gcluster", "migrate-done", strconv.Itoa(int(slot.ID)))) - return nil -} diff --git a/cluster/topo_gcluster.go b/cluster/topo_gcluster.go deleted file mode 100644 index 8b9fbc1..0000000 --- a/cluster/topo_gcluster.go +++ /dev/null @@ -1,146 +0,0 @@ -package cluster - -import ( - "fmt" - "github.com/hdt3213/godis/aof" - "github.com/hdt3213/godis/interface/redis" - "github.com/hdt3213/godis/lib/logger" - "github.com/hdt3213/godis/redis/protocol" - "strconv" - "strings" -) - -func init() { - registerCmd("gcluster", execGCluster) -} - -func execGCluster(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { - if len(args) < 2 { - return protocol.MakeArgNumErrReply("gcluster") - } - subCmd := strings.ToLower(string(args[1])) - switch subCmd { - case "set-slot": - // Command line: gcluster set-slot - // Other node request current node to migrate a slot to it. - // Current node will set the slot as migrating state. - // After this function return, all requests of target slot will be routed to target node - return execGClusterSetSlot(cluster, c, args[2:]) - case "migrate": - // Command line: gcluster migrate - // Current node will dump the given slot to the node sending this request - // The given slot must in migrating state - return execGClusterMigrate(cluster, c, args[2:]) - case "migrate-done": - // command line: gcluster migrate-done - // The new node hosting given slot tells current node that migration has finished, remains data can be deleted - return execGClusterMigrateDone(cluster, c, args[2:]) - case "request-donate": - // command line: gcluster donate - // picks some slots and gives them to the calling node for load balance - return execGClusterDonateSlot(cluster, c, args[2:]) - } - return protocol.MakeErrReply(" ERR unknown gcluster sub command '" + subCmd + "'") -} - -// execGClusterSetSlot set a current node hosted slot as migrating -// args is [slotID, newNodeId] -func execGClusterSetSlot(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { - if len(args) != 2 { - return protocol.MakeArgNumErrReply("gcluster") - } - slotId0, err := strconv.Atoi(string(args[0])) - if err != nil || slotId0 >= slotCount { - return protocol.MakeErrReply("ERR value is not a valid slot id") - } - slotId := uint32(slotId0) - targetNodeID := string(args[1]) - targetNode := cluster.topology.GetNode(targetNodeID) - if targetNode == nil { - return protocol.MakeErrReply("ERR node not found") - } - cluster.setSlotMovingOut(slotId, targetNodeID) - logger.Info(fmt.Sprintf("set slot %d to node %s", slotId, targetNodeID)) - return protocol.MakeOkReply() -} - -// execGClusterDonateSlot picks some slots and gives them to the calling node for load balance -// args is [callingNodeId] -func execGClusterDonateSlot(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { - targetNodeID := string(args[0]) - nodes := cluster.topology.GetNodes() // including the new node - avgSlot := slotCount / len(nodes) - cluster.slotMu.Lock() - defer cluster.slotMu.Unlock() - limit := len(cluster.slots) - avgSlot - if limit <= 0 { - return protocol.MakeEmptyMultiBulkReply() - } - result := make([][]byte, 0, limit) - // use the randomness of the for-each-in-map to randomly select slots - for slotID, slot := range cluster.slots { - if slot.state == slotStateHost { - slot.state = slotStateMovingOut - slot.newNodeID = targetNodeID - slotIDBin := []byte(strconv.FormatUint(uint64(slotID), 10)) - result = append(result, slotIDBin) - if len(result) == limit { - break - } - } - } - return protocol.MakeMultiBulkReply(result) -} - -// execGClusterMigrate Command line: gcluster migrate slotId -// Current node will dump data in the given slot to the node sending this request -// The given slot must in migrating state -func execGClusterMigrate(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { - slotId0, err := strconv.Atoi(string(args[0])) - if err != nil || slotId0 >= slotCount { - return protocol.MakeErrReply("ERR value is not a valid slot id") - } - slotId := uint32(slotId0) - slot := cluster.getHostSlot(slotId) - if slot == nil || slot.state != slotStateMovingOut { - return protocol.MakeErrReply("ERR only dump migrating slot") - } - // migrating slot is immutable - logger.Info("start dump slot", slotId) - slot.keys.ForEach(func(key string) bool { - entity, ok := cluster.db.GetEntity(0, key) - if ok { - ret := aof.EntityToCmd(key, entity) - // todo: handle error and close connection - _, _ = c.Write(ret.ToBytes()) - expire := cluster.db.GetExpiration(0, key) - if expire != nil { - ret = aof.MakeExpireCmd(key, *expire) - _, _ = c.Write(ret.ToBytes()) - } - - } - return true - }) - logger.Info("finish dump slot ", slotId) - // send a ok reply to tell requesting node dump finished - return protocol.MakeOkReply() -} - -// execGClusterMigrateDone command line: gcluster migrate-done -func execGClusterMigrateDone(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { - slotId0, err := strconv.Atoi(string(args[0])) - if err != nil || slotId0 >= slotCount { - return protocol.MakeErrReply("ERR value is not a valid slot id") - } - slotId := uint32(slotId0) - slot := cluster.getHostSlot(slotId) - if slot == nil || slot.state != slotStateMovingOut { - return protocol.MakeErrReply("ERR slot is not moving out") - } - cluster.cleanDroppedSlot(slotId) - cluster.slotMu.Lock() - delete(cluster.slots, slotId) - cluster.slotMu.Unlock() - return protocol.MakeOkReply() -} diff --git a/cluster/topo_interface.go b/cluster/topo_interface.go deleted file mode 100644 index 63103f2..0000000 --- a/cluster/topo_interface.go +++ /dev/null @@ -1,58 +0,0 @@ -package cluster - -import ( - "github.com/hdt3213/godis/redis/protocol" - "hash/crc32" - "strings" - "time" -) - -// Slot represents a hash slot, used in cluster internal messages -type Slot struct { - // ID is uint between 0 and 16383 - ID uint32 - // NodeID is id of the hosting node - // If the slot is migrating, NodeID is the id of the node importing this slot (target node) - NodeID string - // Flags stores more information of slot - Flags uint32 -} - -// getPartitionKey extract hashtag -func getPartitionKey(key string) string { - beg := strings.Index(key, "{") - if beg == -1 { - return key - } - end := strings.Index(key, "}") - if end == -1 || end == beg+1 { - return key - } - return key[beg+1 : end] -} - -func getSlot(key string) uint32 { - partitionKey := getPartitionKey(key) - return crc32.ChecksumIEEE([]byte(partitionKey)) % uint32(slotCount) -} - -// Node represents a node and its slots, used in cluster internal messages -type Node struct { - ID string - Addr string - Slots []*Slot // ascending order by slot id - Flags uint32 - lastHeard time.Time -} - -type topology interface { - GetSelfNodeID() string - GetNodes() []*Node // return a copy - GetNode(nodeID string) *Node - GetSlots() []*Slot - StartAsSeed(addr string) protocol.ErrorReply - SetSlot(slotIDs []uint32, newNodeID string) protocol.ErrorReply - LoadConfigFile() protocol.ErrorReply - Join(seed string) protocol.ErrorReply - Close() error -} diff --git a/cluster/topo_test.go b/cluster/topo_test.go deleted file mode 100644 index d1ebfa0..0000000 --- a/cluster/topo_test.go +++ /dev/null @@ -1,152 +0,0 @@ -package cluster - -import ( - "github.com/hdt3213/godis/config" - database2 "github.com/hdt3213/godis/database" - "github.com/hdt3213/godis/datastruct/dict" - "github.com/hdt3213/godis/lib/idgenerator" - "github.com/hdt3213/godis/lib/utils" - "github.com/hdt3213/godis/redis/connection" - "github.com/hdt3213/godis/redis/protocol/asserts" - "path" - "strconv" - "testing" - "time" -) - -func makeTestRaft(addresses []string, timeoutFlags []bool, persistFilenames []string) ([]*Cluster, error) { - nodes := make([]*Cluster, len(addresses)) - factory := &testClientFactory{ - nodes: nodes, - timeoutFlags: timeoutFlags, - } - for i, addr := range addresses { - addr := addr - nodes[i] = &Cluster{ - self: addr, - addr: addr, - db: database2.NewStandaloneServer(), - transactions: dict.MakeSimple(), - idGenerator: idgenerator.MakeGenerator(config.Properties.Self), - clientFactory: factory, - slots: make(map[uint32]*hostSlot), - } - topologyPersistFile := persistFilenames[i] - nodes[i].topology = newRaft(nodes[i], topologyPersistFile) - } - - err := nodes[0].startAsSeed(addresses[0]) - if err != nil { - return nil, err - } - err = nodes[1].Join(addresses[0]) - if err != nil { - return nil, err - } - err = nodes[2].Join(addresses[0]) - if err != nil { - return nil, err - } - return nodes, nil -} - -func TestRaftStart(t *testing.T) { - addresses := []string{"127.0.0.1:6399", "127.0.0.1:7379", "127.0.0.1:7369"} - timeoutFlags := []bool{false, false, false} - persistFilenames := []string{"", "", ""} - nodes, err := makeTestRaft(addresses, timeoutFlags, persistFilenames) - if err != nil { - t.Error(err) - return - } - if nodes[0].asRaft().state != leader { - t.Error("expect leader") - return - } - if nodes[1].asRaft().state != follower { - t.Error("expect follower") - return - } - if nodes[2].asRaft().state != follower { - t.Error("expect follower") - return - } - size := 100 - conn := connection.NewFakeConn() - for i := 0; i < size; i++ { - str := strconv.Itoa(i) - result := nodes[0].Exec(conn, utils.ToCmdLine("SET", str, str)) - asserts.AssertNotError(t, result) - } - for i := 0; i < size; i++ { - str := strconv.Itoa(i) - result := nodes[0].Exec(conn, utils.ToCmdLine("Get", str)) - asserts.AssertBulkReply(t, result, str) - } - for _, node := range nodes { - _ = node.asRaft().Close() - } -} - -func TestRaftElection(t *testing.T) { - addresses := []string{"127.0.0.1:6399", "127.0.0.1:7379", "127.0.0.1:7369"} - timeoutFlags := []bool{false, false, false} - persistFilenames := []string{"", "", ""} - nodes, err := makeTestRaft(addresses, timeoutFlags, persistFilenames) - if err != nil { - t.Error(err) - return - } - nodes[0].asRaft().Close() - time.Sleep(3 * electionTimeoutMaxMs * time.Millisecond) // wait for leader timeout - //<-make(chan struct{}) // wait for leader timeout - for i := 0; i < 10; i++ { - leaderCount := 0 - for _, node := range nodes { - if node.asRaft().closed { - continue - } - switch node.asRaft().state { - case leader: - leaderCount++ - } - } - if leaderCount == 1 { - break - } else if leaderCount > 1 { - t.Errorf("get %d leaders, split brain", leaderCount) - break - } - time.Sleep(time.Second) - } -} - -func TestRaftPersist(t *testing.T) { - addresses := []string{"127.0.0.1:6399", "127.0.0.1:7379", "127.0.0.1:7369"} - timeoutFlags := []bool{false, false, false} - persistFilenames := []string{ - path.Join(config.Properties.Dir, "test6399.conf"), - path.Join(config.Properties.Dir, "test7379.conf"), - path.Join(config.Properties.Dir, "test7369.conf"), - } - nodes, err := makeTestRaft(addresses, timeoutFlags, persistFilenames) - if err != nil { - t.Error(err) - return - } - node1 := nodes[0].asRaft() - err = node1.persist() - if err != nil { - t.Error(err) - return - } - for _, node := range nodes { - _ = node.asRaft().Close() - } - - err = node1.LoadConfigFile() - if err != nil { - t.Error(err) - return - } -} diff --git a/cluster/topo_utils.go b/cluster/topo_utils.go deleted file mode 100644 index 0c1017d..0000000 --- a/cluster/topo_utils.go +++ /dev/null @@ -1,99 +0,0 @@ -package cluster - -import ( - "github.com/hdt3213/godis/datastruct/set" - "github.com/hdt3213/godis/lib/utils" - "github.com/hdt3213/godis/redis/connection" -) - -func (cluster *Cluster) isImportedKey(key string) bool { - slotId := getSlot(key) - cluster.slotMu.RLock() - slot := cluster.slots[slotId] - cluster.slotMu.RUnlock() - return slot.importedKeys.Has(key) -} - -func (cluster *Cluster) setImportedKey(key string) { - slotId := getSlot(key) - cluster.slotMu.Lock() - slot := cluster.slots[slotId] - cluster.slotMu.Unlock() - slot.importedKeys.Add(key) -} - -// initSlot init a slot when start as seed or import slot from other node -func (cluster *Cluster) initSlot(slotId uint32, state uint32) { - cluster.slotMu.Lock() - defer cluster.slotMu.Unlock() - cluster.slots[slotId] = &hostSlot{ - importedKeys: set.Make(), - keys: set.Make(), - state: state, - } -} - -func (cluster *Cluster) getHostSlot(slotId uint32) *hostSlot { - cluster.slotMu.RLock() - defer cluster.slotMu.RUnlock() - return cluster.slots[slotId] -} - -func (cluster *Cluster) finishSlotImport(slotID uint32) { - cluster.slotMu.Lock() - defer cluster.slotMu.Unlock() - slot := cluster.slots[slotID] - slot.state = slotStateHost - slot.importedKeys = nil - slot.oldNodeID = "" -} - -func (cluster *Cluster) setLocalSlotImporting(slotID uint32, oldNodeID string) { - cluster.slotMu.Lock() - defer cluster.slotMu.Unlock() - slot := cluster.slots[slotID] - if slot == nil { - slot = &hostSlot{ - importedKeys: set.Make(), - keys: set.Make(), - } - cluster.slots[slotID] = slot - } - slot.state = slotStateImporting - slot.oldNodeID = oldNodeID -} - -func (cluster *Cluster) setSlotMovingOut(slotID uint32, newNodeID string) { - cluster.slotMu.Lock() - defer cluster.slotMu.Unlock() - slot := cluster.slots[slotID] - if slot == nil { - slot = &hostSlot{ - importedKeys: set.Make(), - keys: set.Make(), - } - cluster.slots[slotID] = slot - } - slot.state = slotStateMovingOut - slot.newNodeID = newNodeID -} - -// cleanDroppedSlot deletes keys when slot has moved out or failed to import -func (cluster *Cluster) cleanDroppedSlot(slotID uint32) { - cluster.slotMu.RLock() - if cluster.slots[slotID] == nil { - cluster.slotMu.RUnlock() - return - } - keys := cluster.slots[slotID].importedKeys - cluster.slotMu.RUnlock() - c := connection.NewFakeConn() - go func() { - if keys != nil { - keys.ForEach(func(key string) bool { - cluster.db.Exec(c, utils.ToCmdLine("DEL", key)) - return true - }) - } - }() -} diff --git a/cluster/utils.go b/cluster/utils.go deleted file mode 100644 index 84982c3..0000000 --- a/cluster/utils.go +++ /dev/null @@ -1,75 +0,0 @@ -package cluster - -import ( - "github.com/hdt3213/godis/interface/redis" -) - -func ping(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { - return cluster.db.Exec(c, cmdLine) -} - -func info(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { - return cluster.db.Exec(c, cmdLine) -} - -func randomkey(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { - return cluster.db.Exec(c, cmdLine) -} - -/*----- utils -------*/ - -func makeArgs(cmd string, args ...string) [][]byte { - result := make([][]byte, len(args)+1) - result[0] = []byte(cmd) - for i, arg := range args { - result[i+1] = []byte(arg) - } - return result -} - -// return node -> writeKeys -func (cluster *Cluster) groupBy(keys []string) map[string][]string { - result := make(map[string][]string) - for _, key := range keys { - peer := cluster.pickNodeAddrByKey(key) - group, ok := result[peer] - if !ok { - group = make([]string, 0) - } - group = append(group, key) - result[peer] = group - } - return result -} - -// pickNode returns the node id hosting the given slot. -// If the slot is migrating, return the node which is importing the slot -func (cluster *Cluster) pickNode(slotID uint32) *Node { - // check cluster.slot to avoid errors caused by inconsistent status on follower nodes during raft commits - // see cluster.reBalance() - hSlot := cluster.getHostSlot(slotID) - if hSlot != nil { - switch hSlot.state { - case slotStateMovingOut: - return cluster.topology.GetNode(hSlot.newNodeID) - case slotStateImporting, slotStateHost: - return cluster.topology.GetNode(cluster.self) - } - } - - slot := cluster.topology.GetSlots()[int(slotID)] - node := cluster.topology.GetNode(slot.NodeID) - return node -} - -func (cluster *Cluster) pickNodeAddrByKey(key string) string { - slotId := getSlot(key) - return cluster.pickNode(slotId).Addr -} - -func modifyCmd(cmdLine CmdLine, newCmd string) CmdLine { - var cmdLine2 CmdLine - cmdLine2 = append(cmdLine2, cmdLine...) - cmdLine2[0] = []byte(newCmd) - return cmdLine2 -} diff --git a/cluster/utils_test.go b/cluster/utils_test.go deleted file mode 100644 index f222d7b..0000000 --- a/cluster/utils_test.go +++ /dev/null @@ -1,159 +0,0 @@ -package cluster - -import ( - "errors" - "github.com/hdt3213/godis/config" - database2 "github.com/hdt3213/godis/database" - "github.com/hdt3213/godis/datastruct/dict" - "github.com/hdt3213/godis/interface/redis" - "github.com/hdt3213/godis/lib/idgenerator" - "github.com/hdt3213/godis/lib/utils" - "github.com/hdt3213/godis/redis/connection" - "github.com/hdt3213/godis/redis/parser" - "github.com/hdt3213/godis/redis/protocol" - "math/rand" - "sync" -) - -type testClientFactory struct { - nodes []*Cluster - timeoutFlags []bool -} - -type testClient struct { - targetNode *Cluster - timeoutFlag *bool - conn redis.Connection -} - -func (cli *testClient) Send(cmdLine [][]byte) redis.Reply { - if *cli.timeoutFlag { - return protocol.MakeErrReply("ERR timeout") - } - return cli.targetNode.Exec(cli.conn, cmdLine) -} - -func (factory *testClientFactory) GetPeerClient(peerAddr string) (peerClient, error) { - for i, n := range factory.nodes { - if n.self == peerAddr { - cli := &testClient{ - targetNode: n, - timeoutFlag: &factory.timeoutFlags[i], - conn: connection.NewFakeConn(), - } - if config.Properties.RequirePass != "" { - cli.Send(utils.ToCmdLine("AUTH", config.Properties.RequirePass)) - } - return cli, nil - } - } - return nil, errors.New("peer not found") -} - -type mockStream struct { - targetNode *Cluster - ch <-chan *parser.Payload -} - -func (s *mockStream) Stream() <-chan *parser.Payload { - return s.ch -} - -func (s *mockStream) Close() error { - return nil -} - -func (factory *testClientFactory) NewStream(peerAddr string, cmdLine CmdLine) (peerStream, error) { - for _, n := range factory.nodes { - if n.self == peerAddr { - conn := connection.NewFakeConn() - if config.Properties.RequirePass != "" { - n.Exec(conn, utils.ToCmdLine("AUTH", config.Properties.RequirePass)) - } - result := n.Exec(conn, cmdLine) - conn.Write(result.ToBytes()) - ch := parser.ParseStream(conn) - return &mockStream{ - targetNode: n, - ch: ch, - }, nil - } - } - return nil, errors.New("node not found") -} - -func (factory *testClientFactory) ReturnPeerClient(peer string, peerClient peerClient) error { - return nil -} - -func (factory *testClientFactory) Close() error { - return nil -} - -// mockClusterNodes creates a fake cluster for test -// timeoutFlags should have the same length as addresses, set timeoutFlags[i] == true could simulate addresses[i] timeout -func mockClusterNodes(addresses []string, timeoutFlags []bool) []*Cluster { - nodes := make([]*Cluster, len(addresses)) - // build fixedTopology - slots := make([]*Slot, slotCount) - nodeMap := make(map[string]*Node) - for _, addr := range addresses { - nodeMap[addr] = &Node{ - ID: addr, - Addr: addr, - Slots: nil, - } - } - for i := range slots { - addr := addresses[i%len(addresses)] - slots[i] = &Slot{ - ID: uint32(i), - NodeID: addr, - Flags: 0, - } - nodeMap[addr].Slots = append(nodeMap[addr].Slots, slots[i]) - } - factory := &testClientFactory{ - nodes: nodes, - timeoutFlags: timeoutFlags, - } - for i, addr := range addresses { - topo := &fixedTopology{ - mu: sync.RWMutex{}, - nodeMap: nodeMap, - slots: slots, - selfNodeID: addr, - } - nodes[i] = &Cluster{ - self: addr, - db: database2.NewStandaloneServer(), - transactions: dict.MakeSimple(), - idGenerator: idgenerator.MakeGenerator(config.Properties.Self), - topology: topo, - clientFactory: factory, - } - } - return nodes -} - -var addresses = []string{"127.0.0.1:6399", "127.0.0.1:7379"} -var timeoutFlags = []bool{false, false} -var testCluster = mockClusterNodes(addresses, timeoutFlags) - -func toArgs(cmd ...string) [][]byte { - args := make([][]byte, len(cmd)) - for i, s := range cmd { - args[i] = []byte(s) - } - return args -} - -var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789") - -func RandString(n int) string { - b := make([]rune, n) - for i := range b { - b[i] = letters[rand.Intn(len(letters))] - } - return string(b) -} diff --git a/config/config.go b/config/config.go index 761372e..9f02e77 100644 --- a/config/config.go +++ b/config/config.go @@ -40,11 +40,12 @@ type ServerProperties struct { SlaveAnnouncePort int `cfg:"slave-announce-port"` SlaveAnnounceIP string `cfg:"slave-announce-ip"` ReplTimeout int `cfg:"repl-timeout"` - + ClusterEnable bool `cfg:"cluster-enable"` ClusterAsSeed bool `cfg:"cluster-as-seed"` ClusterSeed string `cfg:"cluster-seed"` - + RaftListenAddr string `cfg:"raft-listen-address"` + RaftAdvertiseAddr string `cfg:"raft-advertise-address"` // config file path CfPath string `cfg:"cf,omitempty"` } @@ -54,7 +55,10 @@ type ServerInfo struct { } func (p *ServerProperties) AnnounceAddress() string { - return p.AnnounceHost + ":" + strconv.Itoa(p.Port) + if p.AnnounceHost != "" { + return p.AnnounceHost + ":" + strconv.Itoa(p.Port) + } + return p.Bind + ":" + strconv.Itoa(p.Port) } // Properties holds global config properties