diff --git a/cluster/cluster.go b/cluster/cluster.go index 9ff4a20..b4edadc 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -5,7 +5,7 @@ import ( "os" "path" - _ "github.com/hdt3213/godis/cluster/commands" // register nodes + _ "github.com/hdt3213/godis/cluster/commands" // register commands "github.com/hdt3213/godis/cluster/core" "github.com/hdt3213/godis/cluster/raft" "github.com/hdt3213/godis/config" diff --git a/cluster/commands/mset.go b/cluster/commands/mset.go new file mode 100644 index 0000000..5884b55 --- /dev/null +++ b/cluster/commands/mset.go @@ -0,0 +1,74 @@ +package commands + +import ( + "github.com/hdt3213/godis/cluster/core" + "github.com/hdt3213/godis/interface/redis" + "github.com/hdt3213/godis/lib/utils" + "github.com/hdt3213/godis/redis/protocol" +) + +func init() { + core.RegisterCmd("mset_", execMSet_) +} + +type CmdLine = [][]byte + +// execMSet_ executes msets in local node +func execMSet_(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { + if len(cmdLine) < 3 { + return protocol.MakeArgNumErrReply("mset") + } + cmdLine[0] = []byte("mset") + return cluster.LocalExec(c, cmdLine) +} + +func requestRollback(cluster *core.Cluster, c redis.Connection, txId string, routeMap map[string][]string) { + rollbackCmd := utils.ToCmdLine("rollback", txId) + for node := range routeMap { + cluster.Relay(node, c, rollbackCmd) + } +} + +// execMSetSlow execute mset through tcc +func execMSetSlow(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine, routeMap map[string][]string) redis.Reply { + txId := utils.RandString(6) + + keyValues := make(map[string][]byte) + for i := 1; i < len(cmdLine); i += 2 { + key := string(cmdLine[i]) + value := cmdLine[i+1] + keyValues[key] = value + } + + // make prepare requests + nodePrepareCmdMap := make(map[string]CmdLine) + for node, keys := range routeMap { + prepareCmd := utils.ToCmdLine("prepare", txId, "mset") + for _, key := range keys { + value := keyValues[key] + prepareCmd = append(prepareCmd, []byte(key), value) + } + nodePrepareCmdMap[node] = prepareCmd + } + + // send prepare request + for node, prepareCmd := range nodePrepareCmdMap { + reply := cluster.Relay(node, c, prepareCmd) + if protocol.IsErrorReply(reply) { + requestRollback(cluster, c, txId, routeMap) + return protocol.MakeErrReply("prepare failed") + } + } + + // send commit request + commiteCmd := utils.ToCmdLine("commit", txId) + for node := range nodePrepareCmdMap { + reply := cluster.Relay(node, c, commiteCmd) + if protocol.IsErrorReply(reply) { + requestRollback(cluster, c, txId, routeMap) + return protocol.MakeErrReply("commit failed") + } + } + + return protocol.MakeOkReply() +} diff --git a/cluster/core/core.go b/cluster/core/core.go index 05e3209..84ce223 100644 --- a/cluster/core/core.go +++ b/cluster/core/core.go @@ -21,6 +21,7 @@ type Cluster struct { slotsManager *slotsManager rebalanceManger *rebalanceManager + transactions *TransactionManager } type Config struct { @@ -127,6 +128,7 @@ func NewCluster(cfg *Config) (*Cluster, error) { config: cfg, rebalanceManger: newRebalanceManager(), slotsManager: newSlotsManager(), + transactions: newTransactionManager(), } cluster.injectInsertCallback() cluster.injectDeleteCallback() diff --git a/cluster/core/migration.go b/cluster/core/migration.go index 23672b3..27bb3c9 100644 --- a/cluster/core/migration.go +++ b/cluster/core/migration.go @@ -152,7 +152,6 @@ func execFinishExport(cluster *Cluster, c redis.Connection, cmdLine CmdLine) red } logger.Infof("finishing migration task %s, got task info", taskId) - // transport dirty keys within lock, lock will be released while migration done var lockedSlots []uint32 defer func() { @@ -171,7 +170,6 @@ func execFinishExport(cluster *Cluster, c redis.Connection, cmdLine CmdLine) red } logger.Infof("finishing migration task %s, dirty keys transported", taskId) - // propose migrate finish leaderConn, err := cluster.BorrowLeaderClient() if err != nil { diff --git a/cluster/core/tcc.go b/cluster/core/tcc.go new file mode 100644 index 0000000..6162de9 --- /dev/null +++ b/cluster/core/tcc.go @@ -0,0 +1,128 @@ +package core + +import ( + "sync" + + "github.com/hdt3213/godis/database" + "github.com/hdt3213/godis/interface/redis" + "github.com/hdt3213/godis/redis/protocol" +) + +type TransactionManager struct { + txs map[string]*TCC + mu sync.RWMutex +} + +type TCC struct { + realCmdLine CmdLine + undoLogs []CmdLine + writeKeys []string + readKeys []string +} + +func newTransactionManager() *TransactionManager { + return &TransactionManager{ + txs: make(map[string]*TCC), + } +} + +func init() { + RegisterCmd("prepare", execPrepare) + RegisterCmd("commit", execCommit) + RegisterCmd("rollback", execRollback) + +} + +// execPrepare executes prepare command +// commandline: prepare txid realCmd realArgs... +// execPrepare will check transaction preconditions, lock related keys and prepare undo logs +func execPrepare(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { + if len(cmdLine) < 3 { + return protocol.MakeArgNumErrReply("prepare") + } + txId := string(cmdLine[1]) + realCmdLine := cmdLine[2:] + + // create transaction + cluster.transactions.mu.Lock() + tx := cluster.transactions.txs[txId] + if tx != nil { + cluster.transactions.mu.Unlock() + return protocol.MakeErrReply("transction existed") + } + tx = &TCC{} + cluster.transactions.txs[txId] = tx + cluster.transactions.mu.Unlock() + + // todo: pre-execute check + + // prepare lock and undo locks + tx.writeKeys, tx.readKeys = database.GetRelatedKeys(realCmdLine) + cluster.db.RWLocks(0, tx.writeKeys, tx.readKeys) + tx.undoLogs = cluster.db.GetUndoLogs(0, realCmdLine) + tx.realCmdLine = realCmdLine + + return protocol.MakeOkReply() +} + +func execCommit(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { + if len(cmdLine) != 2 { + return protocol.MakeArgNumErrReply("commit") + } + txId := string(cmdLine[1]) + + cluster.transactions.mu.Lock() + tx := cluster.transactions.txs[txId] + if tx == nil { + cluster.transactions.mu.Unlock() + return protocol.MakeErrReply("transction not found") + } + cluster.transactions.mu.Unlock() + + resp := cluster.db.Exec(c, tx.realCmdLine) + + // unlock regardless of result + cluster.db.RWUnLocks(0, tx.writeKeys, tx.readKeys) + + if protocol.IsErrorReply(resp) { + // do not delete transaction, waiting rollback + return resp + } + // todo delete transaction after deadline + // cluster.transactions.mu.Lock() + // delete(cluster.transactions.txs, txId) + // cluster.transactions.mu.Unlock() + + return resp +} + +func execRollback(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { + if len(cmdLine) != 2 { + return protocol.MakeArgNumErrReply("rollback") + } + txId := string(cmdLine[1]) + + // get transaction + cluster.transactions.mu.Lock() + tx := cluster.transactions.txs[txId] + if tx == nil { + cluster.transactions.mu.Unlock() + return protocol.MakeErrReply("transction not found") + } + cluster.transactions.mu.Unlock() + + // rollback + cluster.db.RWLocks(0, tx.writeKeys, tx.readKeys) + for i := len(tx.undoLogs) - 1; i >= 0; i-- { + cmdline := tx.undoLogs[i] + cluster.db.Exec(c, cmdline) + } + cluster.db.RWUnLocks(0, tx.writeKeys, tx.readKeys) + + // delete transaction + cluster.transactions.mu.Lock() + delete(cluster.transactions.txs, txId) + cluster.transactions.mu.Unlock() + + return protocol.MakeOkReply() +} diff --git a/cluster/core/utils.go b/cluster/core/utils.go index f85ac89..01e55d6 100644 --- a/cluster/core/utils.go +++ b/cluster/core/utils.go @@ -34,6 +34,11 @@ func (cluster *Cluster) Relay(peerId string, c redis.Connection, cmdLine [][]byt return cli.Send(cmdLine) } +// LocalExec executes command at local node +func (cluster *Cluster) LocalExec(c redis.Connection, cmdLine [][]byte) redis.Reply { + return cluster.db.Exec(c, cmdLine) +} + // GetPartitionKey extract hashtag func GetPartitionKey(key string) string { beg := strings.Index(key, "{") @@ -65,4 +70,4 @@ func execRaftCommittedIndex(cluster *Cluster, c redis.Connection, cmdLine CmdLin return protocol.MakeErrReply(err.Error()) } return protocol.MakeIntReply(int64(index)) -} \ No newline at end of file +} diff --git a/database/transaction.go b/database/transaction.go index d91a38b..8556fa6 100644 --- a/database/transaction.go +++ b/database/transaction.go @@ -164,6 +164,7 @@ func (db *DB) GetUndoLogs(cmdLine [][]byte) []CmdLine { } // GetRelatedKeys analysis related keys +// returns related write keys and read keys func GetRelatedKeys(cmdLine [][]byte) ([]string, []string) { cmdName := strings.ToLower(string(cmdLine[0])) cmd, ok := cmdTable[cmdName]