mirror of
https://github.com/HDT3213/godis.git
synced 2025-10-26 18:21:05 +08:00
reformat code
This commit is contained in:
@@ -1,45 +1,45 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"github.com/HDT3213/godis/src/redis/client"
|
||||
"github.com/jolestar/go-commons-pool/v2"
|
||||
"context"
|
||||
"errors"
|
||||
"github.com/HDT3213/godis/src/redis/client"
|
||||
"github.com/jolestar/go-commons-pool/v2"
|
||||
)
|
||||
|
||||
type ConnectionFactory struct {
|
||||
Peer string
|
||||
Peer string
|
||||
}
|
||||
|
||||
func (f *ConnectionFactory) MakeObject(ctx context.Context) (*pool.PooledObject, error) {
|
||||
c, err := client.MakeClient(f.Peer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.Start()
|
||||
return pool.NewPooledObject(c), nil
|
||||
c, err := client.MakeClient(f.Peer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.Start()
|
||||
return pool.NewPooledObject(c), nil
|
||||
}
|
||||
|
||||
func (f *ConnectionFactory) DestroyObject(ctx context.Context, object *pool.PooledObject) error {
|
||||
c, ok := object.Object.(*client.Client)
|
||||
if !ok {
|
||||
return errors.New("type mismatch")
|
||||
}
|
||||
c.Close()
|
||||
return nil
|
||||
c, ok := object.Object.(*client.Client)
|
||||
if !ok {
|
||||
return errors.New("type mismatch")
|
||||
}
|
||||
c.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *ConnectionFactory) ValidateObject(ctx context.Context, object *pool.PooledObject) bool {
|
||||
// do validate
|
||||
return true
|
||||
// do validate
|
||||
return true
|
||||
}
|
||||
|
||||
func (f *ConnectionFactory) ActivateObject(ctx context.Context, object *pool.PooledObject) error {
|
||||
// do activate
|
||||
return nil
|
||||
// do activate
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *ConnectionFactory) PassivateObject(ctx context.Context, object *pool.PooledObject) error {
|
||||
// do passivate
|
||||
return nil
|
||||
// do passivate
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1,99 +1,99 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"github.com/HDT3213/godis/src/interface/redis"
|
||||
"github.com/HDT3213/godis/src/redis/reply"
|
||||
"strconv"
|
||||
"github.com/HDT3213/godis/src/interface/redis"
|
||||
"github.com/HDT3213/godis/src/redis/reply"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
func Del(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
if len(args) < 2 {
|
||||
return reply.MakeErrReply("ERR wrong number of arguments for 'del' command")
|
||||
}
|
||||
keys := make([]string, len(args)-1)
|
||||
for i := 1; i < len(args); i++ {
|
||||
keys[i-1] = string(args[i])
|
||||
}
|
||||
groupMap := cluster.groupBy(keys)
|
||||
if len(groupMap) == 1 { // do fast
|
||||
for peer, group := range groupMap { // only one group
|
||||
return cluster.Relay(peer, c, makeArgs("DEL", group...))
|
||||
}
|
||||
}
|
||||
// prepare
|
||||
var errReply redis.Reply
|
||||
txId := cluster.idGenerator.NextId()
|
||||
txIdStr := strconv.FormatInt(txId, 10)
|
||||
rollback := false
|
||||
for peer, group := range groupMap {
|
||||
args := []string{txIdStr}
|
||||
args = append(args, group...)
|
||||
var resp redis.Reply
|
||||
if peer == cluster.self {
|
||||
resp = PrepareDel(cluster, c, makeArgs("PrepareDel", args...))
|
||||
} else {
|
||||
resp = cluster.Relay(peer, c, makeArgs("PrepareDel", args...))
|
||||
}
|
||||
if reply.IsErrorReply(resp) {
|
||||
errReply = resp
|
||||
rollback = true
|
||||
break
|
||||
}
|
||||
}
|
||||
var respList []redis.Reply
|
||||
if rollback {
|
||||
// rollback
|
||||
RequestRollback(cluster, c, txId, groupMap)
|
||||
} else {
|
||||
// commit
|
||||
respList, errReply = RequestCommit(cluster, c, txId, groupMap)
|
||||
if errReply != nil {
|
||||
rollback = true
|
||||
}
|
||||
}
|
||||
if !rollback {
|
||||
var deleted int64 = 0
|
||||
for _, resp := range respList {
|
||||
intResp := resp.(*reply.IntReply)
|
||||
deleted += intResp.Code
|
||||
}
|
||||
return reply.MakeIntReply(int64(deleted))
|
||||
}
|
||||
return errReply
|
||||
if len(args) < 2 {
|
||||
return reply.MakeErrReply("ERR wrong number of arguments for 'del' command")
|
||||
}
|
||||
keys := make([]string, len(args)-1)
|
||||
for i := 1; i < len(args); i++ {
|
||||
keys[i-1] = string(args[i])
|
||||
}
|
||||
groupMap := cluster.groupBy(keys)
|
||||
if len(groupMap) == 1 { // do fast
|
||||
for peer, group := range groupMap { // only one group
|
||||
return cluster.Relay(peer, c, makeArgs("DEL", group...))
|
||||
}
|
||||
}
|
||||
// prepare
|
||||
var errReply redis.Reply
|
||||
txId := cluster.idGenerator.NextId()
|
||||
txIdStr := strconv.FormatInt(txId, 10)
|
||||
rollback := false
|
||||
for peer, group := range groupMap {
|
||||
args := []string{txIdStr}
|
||||
args = append(args, group...)
|
||||
var resp redis.Reply
|
||||
if peer == cluster.self {
|
||||
resp = PrepareDel(cluster, c, makeArgs("PrepareDel", args...))
|
||||
} else {
|
||||
resp = cluster.Relay(peer, c, makeArgs("PrepareDel", args...))
|
||||
}
|
||||
if reply.IsErrorReply(resp) {
|
||||
errReply = resp
|
||||
rollback = true
|
||||
break
|
||||
}
|
||||
}
|
||||
var respList []redis.Reply
|
||||
if rollback {
|
||||
// rollback
|
||||
RequestRollback(cluster, c, txId, groupMap)
|
||||
} else {
|
||||
// commit
|
||||
respList, errReply = RequestCommit(cluster, c, txId, groupMap)
|
||||
if errReply != nil {
|
||||
rollback = true
|
||||
}
|
||||
}
|
||||
if !rollback {
|
||||
var deleted int64 = 0
|
||||
for _, resp := range respList {
|
||||
intResp := resp.(*reply.IntReply)
|
||||
deleted += intResp.Code
|
||||
}
|
||||
return reply.MakeIntReply(int64(deleted))
|
||||
}
|
||||
return errReply
|
||||
}
|
||||
|
||||
// args: PrepareDel id keys...
|
||||
func PrepareDel(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
if len(args) < 3 {
|
||||
return reply.MakeErrReply("ERR wrong number of arguments for 'preparedel' command")
|
||||
}
|
||||
txId := string(args[1])
|
||||
keys := make([]string, 0, len(args)-2)
|
||||
for i := 2; i < len(args); i++ {
|
||||
arg := args[i]
|
||||
keys = append(keys, string(arg))
|
||||
}
|
||||
txArgs := makeArgs("DEL", keys...) // actual args for cluster.db
|
||||
tx := NewTransaction(cluster, c, txId, txArgs, keys)
|
||||
cluster.transactions.Put(txId, tx)
|
||||
err := tx.prepare()
|
||||
if err != nil {
|
||||
return reply.MakeErrReply(err.Error())
|
||||
}
|
||||
return &reply.OkReply{}
|
||||
if len(args) < 3 {
|
||||
return reply.MakeErrReply("ERR wrong number of arguments for 'preparedel' command")
|
||||
}
|
||||
txId := string(args[1])
|
||||
keys := make([]string, 0, len(args)-2)
|
||||
for i := 2; i < len(args); i++ {
|
||||
arg := args[i]
|
||||
keys = append(keys, string(arg))
|
||||
}
|
||||
txArgs := makeArgs("DEL", keys...) // actual args for cluster.db
|
||||
tx := NewTransaction(cluster, c, txId, txArgs, keys)
|
||||
cluster.transactions.Put(txId, tx)
|
||||
err := tx.prepare()
|
||||
if err != nil {
|
||||
return reply.MakeErrReply(err.Error())
|
||||
}
|
||||
return &reply.OkReply{}
|
||||
}
|
||||
|
||||
// invoker should provide lock
|
||||
func CommitDel(cluster *Cluster, c redis.Connection, tx *Transaction) redis.Reply {
|
||||
keys := make([]string, len(tx.args))
|
||||
for i, v := range tx.args {
|
||||
keys[i] = string(v)
|
||||
}
|
||||
keys = keys[1:]
|
||||
keys := make([]string, len(tx.args))
|
||||
for i, v := range tx.args {
|
||||
keys[i] = string(v)
|
||||
}
|
||||
keys = keys[1:]
|
||||
|
||||
deleted := cluster.db.Removes(keys...)
|
||||
if deleted > 0 {
|
||||
cluster.db.AddAof(reply.MakeMultiBulkReply(tx.args))
|
||||
}
|
||||
return reply.MakeIntReply(int64(deleted))
|
||||
deleted := cluster.db.Removes(keys...)
|
||||
if deleted > 0 {
|
||||
cluster.db.AddAof(reply.MakeMultiBulkReply(tx.args))
|
||||
}
|
||||
return reply.MakeIntReply(int64(deleted))
|
||||
}
|
||||
|
||||
@@ -1,85 +1,85 @@
|
||||
package idgenerator
|
||||
|
||||
import (
|
||||
"hash/fnv"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
"hash/fnv"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
workerIdBits int64 = 5
|
||||
datacenterIdBits int64 = 5
|
||||
sequenceBits int64 = 12
|
||||
workerIdBits int64 = 5
|
||||
datacenterIdBits int64 = 5
|
||||
sequenceBits int64 = 12
|
||||
|
||||
maxWorkerId int64 = -1 ^ (-1 << uint64(workerIdBits))
|
||||
maxDatacenterId int64 = -1 ^ (-1 << uint64(datacenterIdBits))
|
||||
maxSequence int64 = -1 ^ (-1 << uint64(sequenceBits))
|
||||
maxWorkerId int64 = -1 ^ (-1 << uint64(workerIdBits))
|
||||
maxDatacenterId int64 = -1 ^ (-1 << uint64(datacenterIdBits))
|
||||
maxSequence int64 = -1 ^ (-1 << uint64(sequenceBits))
|
||||
|
||||
timeLeft uint8 = 22
|
||||
dataLeft uint8 = 17
|
||||
workLeft uint8 = 12
|
||||
timeLeft uint8 = 22
|
||||
dataLeft uint8 = 17
|
||||
workLeft uint8 = 12
|
||||
|
||||
twepoch int64 = 1525705533000
|
||||
twepoch int64 = 1525705533000
|
||||
)
|
||||
|
||||
type IdGenerator struct {
|
||||
mu *sync.Mutex
|
||||
lastStamp int64
|
||||
workerId int64
|
||||
dataCenterId int64
|
||||
sequence int64
|
||||
mu *sync.Mutex
|
||||
lastStamp int64
|
||||
workerId int64
|
||||
dataCenterId int64
|
||||
sequence int64
|
||||
}
|
||||
|
||||
func MakeGenerator(cluster string, node string) *IdGenerator {
|
||||
fnv64 := fnv.New64()
|
||||
_, _ = fnv64.Write([]byte(cluster))
|
||||
dataCenterId := int64(fnv64.Sum64())
|
||||
fnv64 := fnv.New64()
|
||||
_, _ = fnv64.Write([]byte(cluster))
|
||||
dataCenterId := int64(fnv64.Sum64())
|
||||
|
||||
fnv64.Reset()
|
||||
_, _ = fnv64.Write([]byte(node))
|
||||
workerId := int64(fnv64.Sum64())
|
||||
fnv64.Reset()
|
||||
_, _ = fnv64.Write([]byte(node))
|
||||
workerId := int64(fnv64.Sum64())
|
||||
|
||||
return &IdGenerator{
|
||||
mu: &sync.Mutex{},
|
||||
lastStamp: -1,
|
||||
dataCenterId: dataCenterId,
|
||||
workerId: workerId,
|
||||
sequence: 1,
|
||||
}
|
||||
return &IdGenerator{
|
||||
mu: &sync.Mutex{},
|
||||
lastStamp: -1,
|
||||
dataCenterId: dataCenterId,
|
||||
workerId: workerId,
|
||||
sequence: 1,
|
||||
}
|
||||
}
|
||||
|
||||
func (w *IdGenerator) getCurrentTime() int64 {
|
||||
return time.Now().UnixNano() / 1e6
|
||||
return time.Now().UnixNano() / 1e6
|
||||
}
|
||||
|
||||
func (w *IdGenerator) NextId() int64 {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
|
||||
timestamp := w.getCurrentTime()
|
||||
if timestamp < w.lastStamp {
|
||||
log.Fatal("can not generate id")
|
||||
}
|
||||
if w.lastStamp == timestamp {
|
||||
w.sequence = (w.sequence + 1) & maxSequence
|
||||
if w.sequence == 0 {
|
||||
for timestamp <= w.lastStamp {
|
||||
timestamp = w.getCurrentTime()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
w.sequence = 0
|
||||
}
|
||||
w.lastStamp = timestamp
|
||||
timestamp := w.getCurrentTime()
|
||||
if timestamp < w.lastStamp {
|
||||
log.Fatal("can not generate id")
|
||||
}
|
||||
if w.lastStamp == timestamp {
|
||||
w.sequence = (w.sequence + 1) & maxSequence
|
||||
if w.sequence == 0 {
|
||||
for timestamp <= w.lastStamp {
|
||||
timestamp = w.getCurrentTime()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
w.sequence = 0
|
||||
}
|
||||
w.lastStamp = timestamp
|
||||
|
||||
return ((timestamp - twepoch) << timeLeft) | (w.dataCenterId << dataLeft) | (w.workerId << workLeft) | w.sequence
|
||||
return ((timestamp - twepoch) << timeLeft) | (w.dataCenterId << dataLeft) | (w.workerId << workLeft) | w.sequence
|
||||
}
|
||||
|
||||
func (w *IdGenerator) tilNextMillis() int64 {
|
||||
timestamp := w.getCurrentTime()
|
||||
if timestamp <= w.lastStamp {
|
||||
timestamp = w.getCurrentTime()
|
||||
}
|
||||
return timestamp
|
||||
timestamp := w.getCurrentTime()
|
||||
if timestamp <= w.lastStamp {
|
||||
timestamp = w.getCurrentTime()
|
||||
}
|
||||
return timestamp
|
||||
}
|
||||
|
||||
@@ -1,159 +1,159 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/HDT3213/godis/src/db"
|
||||
"github.com/HDT3213/godis/src/interface/redis"
|
||||
"github.com/HDT3213/godis/src/redis/reply"
|
||||
"strconv"
|
||||
"fmt"
|
||||
"github.com/HDT3213/godis/src/db"
|
||||
"github.com/HDT3213/godis/src/interface/redis"
|
||||
"github.com/HDT3213/godis/src/redis/reply"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
func MGet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
if len(args) < 2 {
|
||||
return reply.MakeErrReply("ERR wrong number of arguments for 'mget' command")
|
||||
}
|
||||
keys := make([]string, len(args)-1)
|
||||
for i := 1; i < len(args); i++ {
|
||||
keys[i-1] = string(args[i])
|
||||
}
|
||||
if len(args) < 2 {
|
||||
return reply.MakeErrReply("ERR wrong number of arguments for 'mget' command")
|
||||
}
|
||||
keys := make([]string, len(args)-1)
|
||||
for i := 1; i < len(args); i++ {
|
||||
keys[i-1] = string(args[i])
|
||||
}
|
||||
|
||||
resultMap := make(map[string][]byte)
|
||||
groupMap := cluster.groupBy(keys)
|
||||
for peer, group := range groupMap {
|
||||
resp := cluster.Relay(peer, c, makeArgs("MGET", group...))
|
||||
if reply.IsErrorReply(resp) {
|
||||
errReply := resp.(reply.ErrorReply)
|
||||
return reply.MakeErrReply(fmt.Sprintf("ERR during get %s occurs: %v", group[0], errReply.Error()))
|
||||
}
|
||||
arrReply, _ := resp.(*reply.MultiBulkReply)
|
||||
for i, v := range arrReply.Args {
|
||||
key := group[i]
|
||||
resultMap[key] = v
|
||||
}
|
||||
}
|
||||
result := make([][]byte, len(keys))
|
||||
for i, k := range keys {
|
||||
result[i] = resultMap[k]
|
||||
}
|
||||
return reply.MakeMultiBulkReply(result)
|
||||
resultMap := make(map[string][]byte)
|
||||
groupMap := cluster.groupBy(keys)
|
||||
for peer, group := range groupMap {
|
||||
resp := cluster.Relay(peer, c, makeArgs("MGET", group...))
|
||||
if reply.IsErrorReply(resp) {
|
||||
errReply := resp.(reply.ErrorReply)
|
||||
return reply.MakeErrReply(fmt.Sprintf("ERR during get %s occurs: %v", group[0], errReply.Error()))
|
||||
}
|
||||
arrReply, _ := resp.(*reply.MultiBulkReply)
|
||||
for i, v := range arrReply.Args {
|
||||
key := group[i]
|
||||
resultMap[key] = v
|
||||
}
|
||||
}
|
||||
result := make([][]byte, len(keys))
|
||||
for i, k := range keys {
|
||||
result[i] = resultMap[k]
|
||||
}
|
||||
return reply.MakeMultiBulkReply(result)
|
||||
}
|
||||
|
||||
// args: PrepareMSet id keys...
|
||||
func PrepareMSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
if len(args) < 3 {
|
||||
return reply.MakeErrReply("ERR wrong number of arguments for 'preparemset' command")
|
||||
}
|
||||
txId := string(args[1])
|
||||
size := (len(args) - 2) / 2
|
||||
keys := make([]string, size)
|
||||
for i := 0; i < size; i++ {
|
||||
keys[i] = string(args[2*i+2])
|
||||
}
|
||||
if len(args) < 3 {
|
||||
return reply.MakeErrReply("ERR wrong number of arguments for 'preparemset' command")
|
||||
}
|
||||
txId := string(args[1])
|
||||
size := (len(args) - 2) / 2
|
||||
keys := make([]string, size)
|
||||
for i := 0; i < size; i++ {
|
||||
keys[i] = string(args[2*i+2])
|
||||
}
|
||||
|
||||
txArgs := [][]byte{
|
||||
[]byte("MSet"),
|
||||
} // actual args for cluster.db
|
||||
txArgs = append(txArgs, args[2:]...)
|
||||
tx := NewTransaction(cluster, c, txId, txArgs, keys)
|
||||
cluster.transactions.Put(txId, tx)
|
||||
err := tx.prepare()
|
||||
if err != nil {
|
||||
return reply.MakeErrReply(err.Error())
|
||||
}
|
||||
return &reply.OkReply{}
|
||||
txArgs := [][]byte{
|
||||
[]byte("MSet"),
|
||||
} // actual args for cluster.db
|
||||
txArgs = append(txArgs, args[2:]...)
|
||||
tx := NewTransaction(cluster, c, txId, txArgs, keys)
|
||||
cluster.transactions.Put(txId, tx)
|
||||
err := tx.prepare()
|
||||
if err != nil {
|
||||
return reply.MakeErrReply(err.Error())
|
||||
}
|
||||
return &reply.OkReply{}
|
||||
}
|
||||
|
||||
// invoker should provide lock
|
||||
func CommitMSet(cluster *Cluster, c redis.Connection, tx *Transaction) redis.Reply {
|
||||
size := len(tx.args) / 2
|
||||
keys := make([]string, size)
|
||||
values := make([][]byte, size)
|
||||
for i := 0; i < size; i++ {
|
||||
keys[i] = string(tx.args[2*i+1])
|
||||
values[i] = tx.args[2*i+2]
|
||||
}
|
||||
for i, key := range keys {
|
||||
value := values[i]
|
||||
cluster.db.Put(key, &db.DataEntity{Data: value})
|
||||
}
|
||||
cluster.db.AddAof(reply.MakeMultiBulkReply(tx.args))
|
||||
return &reply.OkReply{}
|
||||
size := len(tx.args) / 2
|
||||
keys := make([]string, size)
|
||||
values := make([][]byte, size)
|
||||
for i := 0; i < size; i++ {
|
||||
keys[i] = string(tx.args[2*i+1])
|
||||
values[i] = tx.args[2*i+2]
|
||||
}
|
||||
for i, key := range keys {
|
||||
value := values[i]
|
||||
cluster.db.Put(key, &db.DataEntity{Data: value})
|
||||
}
|
||||
cluster.db.AddAof(reply.MakeMultiBulkReply(tx.args))
|
||||
return &reply.OkReply{}
|
||||
}
|
||||
|
||||
func MSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
argCount := len(args) - 1
|
||||
if argCount%2 != 0 || argCount < 1 {
|
||||
return reply.MakeErrReply("ERR wrong number of arguments for 'mset' command")
|
||||
}
|
||||
argCount := len(args) - 1
|
||||
if argCount%2 != 0 || argCount < 1 {
|
||||
return reply.MakeErrReply("ERR wrong number of arguments for 'mset' command")
|
||||
}
|
||||
|
||||
size := argCount / 2
|
||||
keys := make([]string, size)
|
||||
valueMap := make(map[string]string)
|
||||
for i := 0; i < size; i++ {
|
||||
keys[i] = string(args[2*i+1])
|
||||
valueMap[keys[i]] = string(args[2*i+2])
|
||||
}
|
||||
size := argCount / 2
|
||||
keys := make([]string, size)
|
||||
valueMap := make(map[string]string)
|
||||
for i := 0; i < size; i++ {
|
||||
keys[i] = string(args[2*i+1])
|
||||
valueMap[keys[i]] = string(args[2*i+2])
|
||||
}
|
||||
|
||||
groupMap := cluster.groupBy(keys)
|
||||
if len(groupMap) == 1 { // do fast
|
||||
for peer := range groupMap {
|
||||
return cluster.Relay(peer, c, args)
|
||||
}
|
||||
}
|
||||
groupMap := cluster.groupBy(keys)
|
||||
if len(groupMap) == 1 { // do fast
|
||||
for peer := range groupMap {
|
||||
return cluster.Relay(peer, c, args)
|
||||
}
|
||||
}
|
||||
|
||||
//prepare
|
||||
var errReply redis.Reply
|
||||
txId := cluster.idGenerator.NextId()
|
||||
txIdStr := strconv.FormatInt(txId, 10)
|
||||
rollback := false
|
||||
for peer, group := range groupMap {
|
||||
peerArgs := []string{txIdStr}
|
||||
for _, k := range group {
|
||||
peerArgs = append(peerArgs, k, valueMap[k])
|
||||
}
|
||||
var resp redis.Reply
|
||||
if peer == cluster.self {
|
||||
resp = PrepareMSet(cluster, c, makeArgs("PrepareMSet", peerArgs...))
|
||||
} else {
|
||||
resp = cluster.Relay(peer, c, makeArgs("PrepareMSet", peerArgs...))
|
||||
}
|
||||
if reply.IsErrorReply(resp) {
|
||||
errReply = resp
|
||||
rollback = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if rollback {
|
||||
// rollback
|
||||
RequestRollback(cluster, c, txId, groupMap)
|
||||
} else {
|
||||
_, errReply = RequestCommit(cluster, c, txId, groupMap)
|
||||
rollback = errReply != nil
|
||||
}
|
||||
if !rollback {
|
||||
return &reply.OkReply{}
|
||||
}
|
||||
return errReply
|
||||
//prepare
|
||||
var errReply redis.Reply
|
||||
txId := cluster.idGenerator.NextId()
|
||||
txIdStr := strconv.FormatInt(txId, 10)
|
||||
rollback := false
|
||||
for peer, group := range groupMap {
|
||||
peerArgs := []string{txIdStr}
|
||||
for _, k := range group {
|
||||
peerArgs = append(peerArgs, k, valueMap[k])
|
||||
}
|
||||
var resp redis.Reply
|
||||
if peer == cluster.self {
|
||||
resp = PrepareMSet(cluster, c, makeArgs("PrepareMSet", peerArgs...))
|
||||
} else {
|
||||
resp = cluster.Relay(peer, c, makeArgs("PrepareMSet", peerArgs...))
|
||||
}
|
||||
if reply.IsErrorReply(resp) {
|
||||
errReply = resp
|
||||
rollback = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if rollback {
|
||||
// rollback
|
||||
RequestRollback(cluster, c, txId, groupMap)
|
||||
} else {
|
||||
_, errReply = RequestCommit(cluster, c, txId, groupMap)
|
||||
rollback = errReply != nil
|
||||
}
|
||||
if !rollback {
|
||||
return &reply.OkReply{}
|
||||
}
|
||||
return errReply
|
||||
|
||||
}
|
||||
|
||||
func MSetNX(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
argCount := len(args) - 1
|
||||
if argCount%2 != 0 || argCount < 1 {
|
||||
return reply.MakeErrReply("ERR wrong number of arguments for 'mset' command")
|
||||
}
|
||||
var peer string
|
||||
size := argCount / 2
|
||||
for i := 0; i < size; i++ {
|
||||
key := string(args[2*i])
|
||||
currentPeer := cluster.peerPicker.Get(key)
|
||||
if peer == "" {
|
||||
peer = currentPeer
|
||||
} else {
|
||||
if peer != currentPeer {
|
||||
return reply.MakeErrReply("ERR msetnx must within one slot in cluster mode")
|
||||
}
|
||||
}
|
||||
}
|
||||
return cluster.Relay(peer, c, args)
|
||||
argCount := len(args) - 1
|
||||
if argCount%2 != 0 || argCount < 1 {
|
||||
return reply.MakeErrReply("ERR wrong number of arguments for 'mset' command")
|
||||
}
|
||||
var peer string
|
||||
size := argCount / 2
|
||||
for i := 0; i < size; i++ {
|
||||
key := string(args[2*i])
|
||||
currentPeer := cluster.peerPicker.Get(key)
|
||||
if peer == "" {
|
||||
peer = currentPeer
|
||||
} else {
|
||||
if peer != currentPeer {
|
||||
return reply.MakeErrReply("ERR msetnx must within one slot in cluster mode")
|
||||
}
|
||||
}
|
||||
}
|
||||
return cluster.Relay(peer, c, args)
|
||||
}
|
||||
|
||||
@@ -1,40 +1,39 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"github.com/HDT3213/godis/src/interface/redis"
|
||||
"github.com/HDT3213/godis/src/redis/reply"
|
||||
"github.com/HDT3213/godis/src/interface/redis"
|
||||
"github.com/HDT3213/godis/src/redis/reply"
|
||||
)
|
||||
|
||||
// TODO: support multiplex slots
|
||||
func Rename(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
if len(args) != 3 {
|
||||
return reply.MakeErrReply("ERR wrong number of arguments for 'rename' command")
|
||||
}
|
||||
src := string(args[1])
|
||||
dest := string(args[2])
|
||||
if len(args) != 3 {
|
||||
return reply.MakeErrReply("ERR wrong number of arguments for 'rename' command")
|
||||
}
|
||||
src := string(args[1])
|
||||
dest := string(args[2])
|
||||
|
||||
srcPeer := cluster.peerPicker.Get(src)
|
||||
destPeer := cluster.peerPicker.Get(dest)
|
||||
srcPeer := cluster.peerPicker.Get(src)
|
||||
destPeer := cluster.peerPicker.Get(dest)
|
||||
|
||||
if srcPeer != destPeer {
|
||||
return reply.MakeErrReply("ERR rename must within one slot in cluster mode")
|
||||
}
|
||||
return cluster.Relay(srcPeer, c, args)
|
||||
if srcPeer != destPeer {
|
||||
return reply.MakeErrReply("ERR rename must within one slot in cluster mode")
|
||||
}
|
||||
return cluster.Relay(srcPeer, c, args)
|
||||
}
|
||||
|
||||
func RenameNx(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
if len(args) != 3 {
|
||||
return reply.MakeErrReply("ERR wrong number of arguments for 'renamenx' command")
|
||||
}
|
||||
src := string(args[1])
|
||||
dest := string(args[2])
|
||||
if len(args) != 3 {
|
||||
return reply.MakeErrReply("ERR wrong number of arguments for 'renamenx' command")
|
||||
}
|
||||
src := string(args[1])
|
||||
dest := string(args[2])
|
||||
|
||||
srcPeer := cluster.peerPicker.Get(src)
|
||||
destPeer := cluster.peerPicker.Get(dest)
|
||||
srcPeer := cluster.peerPicker.Get(src)
|
||||
destPeer := cluster.peerPicker.Get(dest)
|
||||
|
||||
if srcPeer != destPeer {
|
||||
return reply.MakeErrReply("ERR rename must within one slot in cluster mode")
|
||||
}
|
||||
return cluster.Relay(srcPeer, c, args)
|
||||
if srcPeer != destPeer {
|
||||
return reply.MakeErrReply("ERR rename must within one slot in cluster mode")
|
||||
}
|
||||
return cluster.Relay(srcPeer, c, args)
|
||||
}
|
||||
|
||||
|
||||
@@ -3,106 +3,106 @@ package cluster
|
||||
import "github.com/HDT3213/godis/src/interface/redis"
|
||||
|
||||
func MakeRouter() map[string]CmdFunc {
|
||||
routerMap := make(map[string]CmdFunc)
|
||||
routerMap["ping"] = Ping
|
||||
routerMap := make(map[string]CmdFunc)
|
||||
routerMap["ping"] = Ping
|
||||
|
||||
routerMap["commit"] = Commit
|
||||
routerMap["rollback"] = Rollback
|
||||
routerMap["del"] = Del
|
||||
routerMap["preparedel"] = PrepareDel
|
||||
routerMap["preparemset"] = PrepareMSet
|
||||
routerMap["commit"] = Commit
|
||||
routerMap["rollback"] = Rollback
|
||||
routerMap["del"] = Del
|
||||
routerMap["preparedel"] = PrepareDel
|
||||
routerMap["preparemset"] = PrepareMSet
|
||||
|
||||
routerMap["expire"] = defaultFunc
|
||||
routerMap["expireat"] = defaultFunc
|
||||
routerMap["pexpire"] = defaultFunc
|
||||
routerMap["pexpireat"] = defaultFunc
|
||||
routerMap["ttl"] = defaultFunc
|
||||
routerMap["pttl"] = defaultFunc
|
||||
routerMap["persist"] = defaultFunc
|
||||
routerMap["exists"] = defaultFunc
|
||||
routerMap["type"] = defaultFunc
|
||||
routerMap["rename"] = Rename
|
||||
routerMap["renamenx"] = RenameNx
|
||||
routerMap["expire"] = defaultFunc
|
||||
routerMap["expireat"] = defaultFunc
|
||||
routerMap["pexpire"] = defaultFunc
|
||||
routerMap["pexpireat"] = defaultFunc
|
||||
routerMap["ttl"] = defaultFunc
|
||||
routerMap["pttl"] = defaultFunc
|
||||
routerMap["persist"] = defaultFunc
|
||||
routerMap["exists"] = defaultFunc
|
||||
routerMap["type"] = defaultFunc
|
||||
routerMap["rename"] = Rename
|
||||
routerMap["renamenx"] = RenameNx
|
||||
|
||||
routerMap["set"] = defaultFunc
|
||||
routerMap["setnx"] = defaultFunc
|
||||
routerMap["setex"] = defaultFunc
|
||||
routerMap["psetex"] = defaultFunc
|
||||
routerMap["mset"] = MSet
|
||||
routerMap["mget"] = MGet
|
||||
routerMap["msetnx"] = MSetNX
|
||||
routerMap["get"] = defaultFunc
|
||||
routerMap["getset"] = defaultFunc
|
||||
routerMap["incr"] = defaultFunc
|
||||
routerMap["incrby"] = defaultFunc
|
||||
routerMap["incrbyfloat"] = defaultFunc
|
||||
routerMap["decr"] = defaultFunc
|
||||
routerMap["decrby"] = defaultFunc
|
||||
routerMap["set"] = defaultFunc
|
||||
routerMap["setnx"] = defaultFunc
|
||||
routerMap["setex"] = defaultFunc
|
||||
routerMap["psetex"] = defaultFunc
|
||||
routerMap["mset"] = MSet
|
||||
routerMap["mget"] = MGet
|
||||
routerMap["msetnx"] = MSetNX
|
||||
routerMap["get"] = defaultFunc
|
||||
routerMap["getset"] = defaultFunc
|
||||
routerMap["incr"] = defaultFunc
|
||||
routerMap["incrby"] = defaultFunc
|
||||
routerMap["incrbyfloat"] = defaultFunc
|
||||
routerMap["decr"] = defaultFunc
|
||||
routerMap["decrby"] = defaultFunc
|
||||
|
||||
routerMap["lpush"] = defaultFunc
|
||||
routerMap["lpushx"] = defaultFunc
|
||||
routerMap["rpush"] = defaultFunc
|
||||
routerMap["rpushx"] = defaultFunc
|
||||
routerMap["lpop"] = defaultFunc
|
||||
routerMap["rpop"] = defaultFunc
|
||||
//routerMap["rpoplpush"] = RPopLPush
|
||||
routerMap["lrem"] = defaultFunc
|
||||
routerMap["llen"] = defaultFunc
|
||||
routerMap["lindex"] = defaultFunc
|
||||
routerMap["lset"] = defaultFunc
|
||||
routerMap["lrange"] = defaultFunc
|
||||
routerMap["lpush"] = defaultFunc
|
||||
routerMap["lpushx"] = defaultFunc
|
||||
routerMap["rpush"] = defaultFunc
|
||||
routerMap["rpushx"] = defaultFunc
|
||||
routerMap["lpop"] = defaultFunc
|
||||
routerMap["rpop"] = defaultFunc
|
||||
//routerMap["rpoplpush"] = RPopLPush
|
||||
routerMap["lrem"] = defaultFunc
|
||||
routerMap["llen"] = defaultFunc
|
||||
routerMap["lindex"] = defaultFunc
|
||||
routerMap["lset"] = defaultFunc
|
||||
routerMap["lrange"] = defaultFunc
|
||||
|
||||
routerMap["hset"] = defaultFunc
|
||||
routerMap["hsetnx"] = defaultFunc
|
||||
routerMap["hget"] = defaultFunc
|
||||
routerMap["hexists"] = defaultFunc
|
||||
routerMap["hdel"] = defaultFunc
|
||||
routerMap["hlen"] = defaultFunc
|
||||
routerMap["hmget"] = defaultFunc
|
||||
routerMap["hmset"] = defaultFunc
|
||||
routerMap["hkeys"] = defaultFunc
|
||||
routerMap["hvals"] = defaultFunc
|
||||
routerMap["hgetall"] = defaultFunc
|
||||
routerMap["hincrby"] = defaultFunc
|
||||
routerMap["hincrbyfloat"] = defaultFunc
|
||||
routerMap["hset"] = defaultFunc
|
||||
routerMap["hsetnx"] = defaultFunc
|
||||
routerMap["hget"] = defaultFunc
|
||||
routerMap["hexists"] = defaultFunc
|
||||
routerMap["hdel"] = defaultFunc
|
||||
routerMap["hlen"] = defaultFunc
|
||||
routerMap["hmget"] = defaultFunc
|
||||
routerMap["hmset"] = defaultFunc
|
||||
routerMap["hkeys"] = defaultFunc
|
||||
routerMap["hvals"] = defaultFunc
|
||||
routerMap["hgetall"] = defaultFunc
|
||||
routerMap["hincrby"] = defaultFunc
|
||||
routerMap["hincrbyfloat"] = defaultFunc
|
||||
|
||||
routerMap["sadd"] = defaultFunc
|
||||
routerMap["sismember"] = defaultFunc
|
||||
routerMap["srem"] = defaultFunc
|
||||
routerMap["scard"] = defaultFunc
|
||||
routerMap["smembers"] = defaultFunc
|
||||
routerMap["sinter"] = defaultFunc
|
||||
routerMap["sinterstore"] = defaultFunc
|
||||
routerMap["sunion"] = defaultFunc
|
||||
routerMap["sunionstore"] = defaultFunc
|
||||
routerMap["sdiff"] = defaultFunc
|
||||
routerMap["sdiffstore"] = defaultFunc
|
||||
routerMap["srandmember"] = defaultFunc
|
||||
routerMap["sadd"] = defaultFunc
|
||||
routerMap["sismember"] = defaultFunc
|
||||
routerMap["srem"] = defaultFunc
|
||||
routerMap["scard"] = defaultFunc
|
||||
routerMap["smembers"] = defaultFunc
|
||||
routerMap["sinter"] = defaultFunc
|
||||
routerMap["sinterstore"] = defaultFunc
|
||||
routerMap["sunion"] = defaultFunc
|
||||
routerMap["sunionstore"] = defaultFunc
|
||||
routerMap["sdiff"] = defaultFunc
|
||||
routerMap["sdiffstore"] = defaultFunc
|
||||
routerMap["srandmember"] = defaultFunc
|
||||
|
||||
routerMap["zadd"] = defaultFunc
|
||||
routerMap["zscore"] = defaultFunc
|
||||
routerMap["zincrby"] = defaultFunc
|
||||
routerMap["zrank"] = defaultFunc
|
||||
routerMap["zcount"] = defaultFunc
|
||||
routerMap["zrevrank"] = defaultFunc
|
||||
routerMap["zcard"] = defaultFunc
|
||||
routerMap["zrange"] = defaultFunc
|
||||
routerMap["zrevrange"] = defaultFunc
|
||||
routerMap["zrangebyscore"] = defaultFunc
|
||||
routerMap["zrevrangebyscore"] = defaultFunc
|
||||
routerMap["zrem"] = defaultFunc
|
||||
routerMap["zremrangebyscore"] = defaultFunc
|
||||
routerMap["zremrangebyrank"] = defaultFunc
|
||||
routerMap["zadd"] = defaultFunc
|
||||
routerMap["zscore"] = defaultFunc
|
||||
routerMap["zincrby"] = defaultFunc
|
||||
routerMap["zrank"] = defaultFunc
|
||||
routerMap["zcount"] = defaultFunc
|
||||
routerMap["zrevrank"] = defaultFunc
|
||||
routerMap["zcard"] = defaultFunc
|
||||
routerMap["zrange"] = defaultFunc
|
||||
routerMap["zrevrange"] = defaultFunc
|
||||
routerMap["zrangebyscore"] = defaultFunc
|
||||
routerMap["zrevrangebyscore"] = defaultFunc
|
||||
routerMap["zrem"] = defaultFunc
|
||||
routerMap["zremrangebyscore"] = defaultFunc
|
||||
routerMap["zremrangebyrank"] = defaultFunc
|
||||
|
||||
//routerMap["flushdb"] = FlushDB
|
||||
//routerMap["flushall"] = FlushAll
|
||||
//routerMap["keys"] = Keys
|
||||
//routerMap["flushdb"] = FlushDB
|
||||
//routerMap["flushall"] = FlushAll
|
||||
//routerMap["keys"] = Keys
|
||||
|
||||
return routerMap
|
||||
return routerMap
|
||||
}
|
||||
|
||||
func defaultFunc(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
key := string(args[1])
|
||||
peer := cluster.peerPicker.Get(key)
|
||||
return cluster.Relay(peer, c, args)
|
||||
}
|
||||
key := string(args[1])
|
||||
peer := cluster.peerPicker.Get(key)
|
||||
return cluster.Relay(peer, c, args)
|
||||
}
|
||||
|
||||
@@ -1,188 +1,188 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/HDT3213/godis/src/db"
|
||||
"github.com/HDT3213/godis/src/interface/redis"
|
||||
"github.com/HDT3213/godis/src/lib/marshal/gob"
|
||||
"github.com/HDT3213/godis/src/redis/reply"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/HDT3213/godis/src/db"
|
||||
"github.com/HDT3213/godis/src/interface/redis"
|
||||
"github.com/HDT3213/godis/src/lib/marshal/gob"
|
||||
"github.com/HDT3213/godis/src/redis/reply"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Transaction struct {
|
||||
id string // transaction id
|
||||
args [][]byte // cmd args
|
||||
cluster *Cluster
|
||||
conn redis.Connection
|
||||
id string // transaction id
|
||||
args [][]byte // cmd args
|
||||
cluster *Cluster
|
||||
conn redis.Connection
|
||||
|
||||
keys []string // related keys
|
||||
undoLog map[string][]byte // store data for undoLog
|
||||
keys []string // related keys
|
||||
undoLog map[string][]byte // store data for undoLog
|
||||
|
||||
lockUntil time.Time
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
status int8
|
||||
lockUntil time.Time
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
status int8
|
||||
}
|
||||
|
||||
const (
|
||||
maxLockTime = 3 * time.Second
|
||||
maxLockTime = 3 * time.Second
|
||||
|
||||
CreatedStatus = 0
|
||||
PreparedStatus = 1
|
||||
CommitedStatus = 2
|
||||
RollbackedStatus = 3
|
||||
CreatedStatus = 0
|
||||
PreparedStatus = 1
|
||||
CommitedStatus = 2
|
||||
RollbackedStatus = 3
|
||||
)
|
||||
|
||||
func NewTransaction(cluster *Cluster, c redis.Connection, id string, args [][]byte, keys []string) *Transaction {
|
||||
return &Transaction{
|
||||
id: id,
|
||||
args: args,
|
||||
cluster: cluster,
|
||||
conn: c,
|
||||
keys: keys,
|
||||
status: CreatedStatus,
|
||||
}
|
||||
return &Transaction{
|
||||
id: id,
|
||||
args: args,
|
||||
cluster: cluster,
|
||||
conn: c,
|
||||
keys: keys,
|
||||
status: CreatedStatus,
|
||||
}
|
||||
}
|
||||
|
||||
// t should contains Keys field
|
||||
func (tx *Transaction) prepare() error {
|
||||
// lock keys
|
||||
tx.cluster.db.Locks(tx.keys...)
|
||||
// lock keys
|
||||
tx.cluster.db.Locks(tx.keys...)
|
||||
|
||||
// use context to manage
|
||||
//tx.lockUntil = time.Now().Add(maxLockTime)
|
||||
//ctx, cancel := context.WithDeadline(context.Background(), tx.lockUntil)
|
||||
//tx.ctx = ctx
|
||||
//tx.cancel = cancel
|
||||
// use context to manage
|
||||
//tx.lockUntil = time.Now().Add(maxLockTime)
|
||||
//ctx, cancel := context.WithDeadline(context.Background(), tx.lockUntil)
|
||||
//tx.ctx = ctx
|
||||
//tx.cancel = cancel
|
||||
|
||||
// build undoLog
|
||||
tx.undoLog = make(map[string][]byte)
|
||||
for _, key := range tx.keys {
|
||||
entity, ok := tx.cluster.db.Get(key)
|
||||
if ok {
|
||||
blob, err := gob.Marshal(entity)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tx.undoLog[key] = blob
|
||||
} else {
|
||||
tx.undoLog[key] = []byte{} // entity was nil, should be removed while rollback
|
||||
}
|
||||
}
|
||||
tx.status = PreparedStatus
|
||||
return nil
|
||||
// build undoLog
|
||||
tx.undoLog = make(map[string][]byte)
|
||||
for _, key := range tx.keys {
|
||||
entity, ok := tx.cluster.db.Get(key)
|
||||
if ok {
|
||||
blob, err := gob.Marshal(entity)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tx.undoLog[key] = blob
|
||||
} else {
|
||||
tx.undoLog[key] = []byte{} // entity was nil, should be removed while rollback
|
||||
}
|
||||
}
|
||||
tx.status = PreparedStatus
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tx *Transaction) rollback() error {
|
||||
for key, blob := range tx.undoLog {
|
||||
if len(blob) > 0 {
|
||||
entity := &db.DataEntity{}
|
||||
err := gob.UnMarshal(blob, entity)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tx.cluster.db.Put(key, entity)
|
||||
} else {
|
||||
tx.cluster.db.Remove(key)
|
||||
}
|
||||
}
|
||||
if tx.status != CommitedStatus {
|
||||
tx.cluster.db.UnLocks(tx.keys...)
|
||||
}
|
||||
tx.status = RollbackedStatus
|
||||
return nil
|
||||
for key, blob := range tx.undoLog {
|
||||
if len(blob) > 0 {
|
||||
entity := &db.DataEntity{}
|
||||
err := gob.UnMarshal(blob, entity)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tx.cluster.db.Put(key, entity)
|
||||
} else {
|
||||
tx.cluster.db.Remove(key)
|
||||
}
|
||||
}
|
||||
if tx.status != CommitedStatus {
|
||||
tx.cluster.db.UnLocks(tx.keys...)
|
||||
}
|
||||
tx.status = RollbackedStatus
|
||||
return nil
|
||||
}
|
||||
|
||||
// rollback local transaction
|
||||
func Rollback(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
if len(args) != 2 {
|
||||
return reply.MakeErrReply("ERR wrong number of arguments for 'rollback' command")
|
||||
}
|
||||
txId := string(args[1])
|
||||
raw, ok := cluster.transactions.Get(txId)
|
||||
if !ok {
|
||||
return reply.MakeIntReply(0)
|
||||
}
|
||||
tx, _ := raw.(*Transaction)
|
||||
err := tx.rollback()
|
||||
if err != nil {
|
||||
return reply.MakeErrReply(err.Error())
|
||||
}
|
||||
return reply.MakeIntReply(1)
|
||||
if len(args) != 2 {
|
||||
return reply.MakeErrReply("ERR wrong number of arguments for 'rollback' command")
|
||||
}
|
||||
txId := string(args[1])
|
||||
raw, ok := cluster.transactions.Get(txId)
|
||||
if !ok {
|
||||
return reply.MakeIntReply(0)
|
||||
}
|
||||
tx, _ := raw.(*Transaction)
|
||||
err := tx.rollback()
|
||||
if err != nil {
|
||||
return reply.MakeErrReply(err.Error())
|
||||
}
|
||||
return reply.MakeIntReply(1)
|
||||
}
|
||||
|
||||
// commit local transaction as a worker
|
||||
func Commit(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
if len(args) != 2 {
|
||||
return reply.MakeErrReply("ERR wrong number of arguments for 'commit' command")
|
||||
}
|
||||
txId := string(args[1])
|
||||
raw, ok := cluster.transactions.Get(txId)
|
||||
if !ok {
|
||||
return reply.MakeIntReply(0)
|
||||
}
|
||||
tx, _ := raw.(*Transaction)
|
||||
if len(args) != 2 {
|
||||
return reply.MakeErrReply("ERR wrong number of arguments for 'commit' command")
|
||||
}
|
||||
txId := string(args[1])
|
||||
raw, ok := cluster.transactions.Get(txId)
|
||||
if !ok {
|
||||
return reply.MakeIntReply(0)
|
||||
}
|
||||
tx, _ := raw.(*Transaction)
|
||||
|
||||
// finish transaction
|
||||
defer func() {
|
||||
cluster.db.UnLocks(tx.keys...)
|
||||
tx.status = CommitedStatus
|
||||
//cluster.transactions.Remove(tx.id) // cannot remove, may rollback after commit
|
||||
}()
|
||||
// finish transaction
|
||||
defer func() {
|
||||
cluster.db.UnLocks(tx.keys...)
|
||||
tx.status = CommitedStatus
|
||||
//cluster.transactions.Remove(tx.id) // cannot remove, may rollback after commit
|
||||
}()
|
||||
|
||||
cmd := strings.ToLower(string(tx.args[0]))
|
||||
var result redis.Reply
|
||||
if cmd == "del" {
|
||||
result = CommitDel(cluster, c, tx)
|
||||
} else if cmd == "mset" {
|
||||
result = CommitMSet(cluster, c, tx)
|
||||
}
|
||||
cmd := strings.ToLower(string(tx.args[0]))
|
||||
var result redis.Reply
|
||||
if cmd == "del" {
|
||||
result = CommitDel(cluster, c, tx)
|
||||
} else if cmd == "mset" {
|
||||
result = CommitMSet(cluster, c, tx)
|
||||
}
|
||||
|
||||
if reply.IsErrorReply(result) {
|
||||
// failed
|
||||
err2 := tx.rollback()
|
||||
return reply.MakeErrReply(fmt.Sprintf("err occurs when rollback: %v, origin err: %s", err2, result))
|
||||
}
|
||||
if reply.IsErrorReply(result) {
|
||||
// failed
|
||||
err2 := tx.rollback()
|
||||
return reply.MakeErrReply(fmt.Sprintf("err occurs when rollback: %v, origin err: %s", err2, result))
|
||||
}
|
||||
|
||||
return result
|
||||
return result
|
||||
}
|
||||
|
||||
// request all node commit transaction as leader
|
||||
func RequestCommit(cluster *Cluster, c redis.Connection, txId int64, peers map[string][]string) ([]redis.Reply, reply.ErrorReply) {
|
||||
var errReply reply.ErrorReply
|
||||
txIdStr := strconv.FormatInt(txId, 10)
|
||||
respList := make([]redis.Reply, 0, len(peers))
|
||||
for peer := range peers {
|
||||
var resp redis.Reply
|
||||
if peer == cluster.self {
|
||||
resp = Commit(cluster, c, makeArgs("commit", txIdStr))
|
||||
} else {
|
||||
resp = cluster.Relay(peer, c, makeArgs("commit", txIdStr))
|
||||
}
|
||||
if reply.IsErrorReply(resp) {
|
||||
errReply = resp.(reply.ErrorReply)
|
||||
break
|
||||
}
|
||||
respList = append(respList, resp)
|
||||
}
|
||||
if errReply != nil {
|
||||
RequestRollback(cluster, c, txId, peers)
|
||||
return nil, errReply
|
||||
}
|
||||
return respList, nil
|
||||
var errReply reply.ErrorReply
|
||||
txIdStr := strconv.FormatInt(txId, 10)
|
||||
respList := make([]redis.Reply, 0, len(peers))
|
||||
for peer := range peers {
|
||||
var resp redis.Reply
|
||||
if peer == cluster.self {
|
||||
resp = Commit(cluster, c, makeArgs("commit", txIdStr))
|
||||
} else {
|
||||
resp = cluster.Relay(peer, c, makeArgs("commit", txIdStr))
|
||||
}
|
||||
if reply.IsErrorReply(resp) {
|
||||
errReply = resp.(reply.ErrorReply)
|
||||
break
|
||||
}
|
||||
respList = append(respList, resp)
|
||||
}
|
||||
if errReply != nil {
|
||||
RequestRollback(cluster, c, txId, peers)
|
||||
return nil, errReply
|
||||
}
|
||||
return respList, nil
|
||||
}
|
||||
|
||||
// request all node rollback transaction as leader
|
||||
func RequestRollback(cluster *Cluster, c redis.Connection, txId int64, peers map[string][]string) {
|
||||
txIdStr := strconv.FormatInt(txId, 10)
|
||||
for peer := range peers {
|
||||
if peer == cluster.self {
|
||||
Rollback(cluster, c, makeArgs("rollback", txIdStr))
|
||||
} else {
|
||||
cluster.Relay(peer, c, makeArgs("rollback", txIdStr))
|
||||
}
|
||||
}
|
||||
}
|
||||
txIdStr := strconv.FormatInt(txId, 10)
|
||||
for peer := range peers {
|
||||
if peer == cluster.self {
|
||||
Rollback(cluster, c, makeArgs("rollback", txIdStr))
|
||||
} else {
|
||||
cluster.Relay(peer, c, makeArgs("rollback", txIdStr))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user