This commit is contained in:
hdt3213
2021-11-08 23:24:43 +08:00
parent 4163a45278
commit 38001a2d18
12 changed files with 41 additions and 8 deletions

View File

@@ -14,6 +14,7 @@ import (
"sync" "sync"
) )
// CmdLine is alias for [][]byte, represents a command line
type CmdLine = [][]byte type CmdLine = [][]byte
const ( const (
@@ -25,6 +26,7 @@ type payload struct {
dbIndex int dbIndex int
} }
// Handler receive msgs from channel and write to AOF file
type Handler struct { type Handler struct {
db database.EmbedDB db database.EmbedDB
tmpDBMaker func() database.EmbedDB tmpDBMaker func() database.EmbedDB
@@ -38,6 +40,7 @@ type Handler struct {
currentDB int currentDB int
} }
// NewAOFHandler creates a new aof.Handler
func NewAOFHandler(db database.EmbedDB, tmpDBMaker func() database.EmbedDB) (*Handler, error) { func NewAOFHandler(db database.EmbedDB, tmpDBMaker func() database.EmbedDB) (*Handler, error) {
handler := &Handler{} handler := &Handler{}
handler.aofFilename = config.Properties.AppendFilename handler.aofFilename = config.Properties.AppendFilename
@@ -144,6 +147,7 @@ func (handler *Handler) LoadAof(maxBytes int) {
} }
} }
// Close gracefully stops aof persistence procedure
func (handler *Handler) Close() { func (handler *Handler) Close() {
if handler.aofFile != nil { if handler.aofFile != nil {
close(handler.aofChan) close(handler.aofChan)

View File

@@ -20,12 +20,14 @@ func (handler *Handler) newRewriteHandler() *Handler {
return h return h
} }
type rewriteCtx struct { // RewriteCtx holds context of an AOF rewriting procedure
type RewriteCtx struct {
tmpFile *os.File tmpFile *os.File
fileSize int64 fileSize int64
dbIdx int // selected db index when startRewrite dbIdx int // selected db index when startRewrite
} }
// Rewrite carries out AOF rewrite
func (handler *Handler) Rewrite() { func (handler *Handler) Rewrite() {
ctx, err := handler.StartRewrite() ctx, err := handler.StartRewrite()
if err != nil { if err != nil {
@@ -41,8 +43,9 @@ func (handler *Handler) Rewrite() {
handler.FinishRewrite(ctx) handler.FinishRewrite(ctx)
} }
// DoRewrite actually rewrite aof file, returns // DoRewrite actually rewrite aof file
func (handler *Handler) DoRewrite(ctx *rewriteCtx) error { // makes DoRewrite public for testing only, please use Rewrite instead
func (handler *Handler) DoRewrite(ctx *RewriteCtx) error {
tmpFile := ctx.tmpFile tmpFile := ctx.tmpFile
// load aof tmpFile // load aof tmpFile
@@ -76,7 +79,7 @@ func (handler *Handler) DoRewrite(ctx *rewriteCtx) error {
} }
// StartRewrite prepares rewrite procedure // StartRewrite prepares rewrite procedure
func (handler *Handler) StartRewrite() (*rewriteCtx, error) { func (handler *Handler) StartRewrite() (*RewriteCtx, error) {
handler.pausingAof.Lock() // pausing aof handler.pausingAof.Lock() // pausing aof
defer handler.pausingAof.Unlock() defer handler.pausingAof.Unlock()
@@ -96,7 +99,7 @@ func (handler *Handler) StartRewrite() (*rewriteCtx, error) {
logger.Warn("tmp file create failed") logger.Warn("tmp file create failed")
return nil, err return nil, err
} }
return &rewriteCtx{ return &RewriteCtx{
tmpFile: file, tmpFile: file,
fileSize: filesize, fileSize: filesize,
dbIdx: handler.currentDB, dbIdx: handler.currentDB,
@@ -104,7 +107,7 @@ func (handler *Handler) StartRewrite() (*rewriteCtx, error) {
} }
// FinishRewrite finish rewrite procedure // FinishRewrite finish rewrite procedure
func (handler *Handler) FinishRewrite(ctx *rewriteCtx) { func (handler *Handler) FinishRewrite(ctx *RewriteCtx) {
handler.pausingAof.Lock() // pausing aof handler.pausingAof.Lock() // pausing aof
defer handler.pausingAof.Unlock() defer handler.pausingAof.Unlock()

View File

@@ -2,6 +2,7 @@ package cluster
import "github.com/hdt3213/godis/interface/redis" import "github.com/hdt3213/godis/interface/redis"
// CmdLine is alias for [][]byte, represents a command line
type CmdLine = [][]byte type CmdLine = [][]byte
func makeRouter() map[string]CmdFunc { func makeRouter() map[string]CmdFunc {

View File

@@ -1,4 +1,4 @@
// Package godis is a memory database with redis compatible interface // Package database is a memory database with redis compatible interface
package database package database
import ( import (
@@ -278,6 +278,7 @@ func (db *DB) addVersion(keys ...string) {
} }
} }
// GetVersion returns version code for given key
func (db *DB) GetVersion(key string) uint32 { func (db *DB) GetVersion(key string) uint32 {
entity, ok := db.versionMap.Get(key) entity, ok := db.versionMap.Get(key)
if !ok { if !ok {
@@ -286,6 +287,7 @@ func (db *DB) GetVersion(key string) uint32 {
return entity.(uint32) return entity.(uint32)
} }
// ForEach traverses all the keys in the database
func (db *DB) ForEach(cb func(key string, data *database.DataEntity, expiration *time.Time) bool) { func (db *DB) ForEach(cb func(key string, data *database.DataEntity, expiration *time.Time) bool) {
db.data.ForEach(func(key string, raw interface{}) bool { db.data.ForEach(func(key string, raw interface{}) bool {
entity, _ := raw.(*database.DataEntity) entity, _ := raw.(*database.DataEntity)

View File

@@ -16,6 +16,7 @@ import (
"time" "time"
) )
// MultiDB is a set of multiple database set
type MultiDB struct { type MultiDB struct {
dbSet []*DB dbSet []*DB
@@ -157,6 +158,7 @@ func (mdb *MultiDB) flushAll() redis.Reply {
return &reply.OkReply{} return &reply.OkReply{}
} }
// ForEach traverses all the keys in the given database
func (mdb *MultiDB) ForEach(dbIndex int, cb func(key string, data *database.DataEntity, expiration *time.Time) bool) { func (mdb *MultiDB) ForEach(dbIndex int, cb func(key string, data *database.DataEntity, expiration *time.Time) bool) {
if dbIndex >= len(mdb.dbSet) { if dbIndex >= len(mdb.dbSet) {
return return
@@ -165,6 +167,7 @@ func (mdb *MultiDB) ForEach(dbIndex int, cb func(key string, data *database.Data
db.ForEach(cb) db.ForEach(cb)
} }
// ExecMulti executes multi commands transaction Atomically and Isolated
func (mdb *MultiDB) ExecMulti(conn redis.Connection, watching map[string]uint32, cmdLines []CmdLine) redis.Reply { func (mdb *MultiDB) ExecMulti(conn redis.Connection, watching map[string]uint32, cmdLines []CmdLine) redis.Reply {
if conn.GetDBIndex() >= len(mdb.dbSet) { if conn.GetDBIndex() >= len(mdb.dbSet) {
return reply.MakeErrReply("ERR DB index is out of range") return reply.MakeErrReply("ERR DB index is out of range")
@@ -173,6 +176,7 @@ func (mdb *MultiDB) ExecMulti(conn redis.Connection, watching map[string]uint32,
return db.ExecMulti(conn, watching, cmdLines) return db.ExecMulti(conn, watching, cmdLines)
} }
// RWLocks lock keys for writing and reading
func (mdb *MultiDB) RWLocks(dbIndex int, writeKeys []string, readKeys []string) { func (mdb *MultiDB) RWLocks(dbIndex int, writeKeys []string, readKeys []string) {
if dbIndex >= len(mdb.dbSet) { if dbIndex >= len(mdb.dbSet) {
panic("ERR DB index is out of range") panic("ERR DB index is out of range")
@@ -181,6 +185,7 @@ func (mdb *MultiDB) RWLocks(dbIndex int, writeKeys []string, readKeys []string)
db.RWLocks(writeKeys, readKeys) db.RWLocks(writeKeys, readKeys)
} }
// RWUnLocks unlock keys for writing and reading
func (mdb *MultiDB) RWUnLocks(dbIndex int, writeKeys []string, readKeys []string) { func (mdb *MultiDB) RWUnLocks(dbIndex int, writeKeys []string, readKeys []string) {
if dbIndex >= len(mdb.dbSet) { if dbIndex >= len(mdb.dbSet) {
panic("ERR DB index is out of range") panic("ERR DB index is out of range")
@@ -189,6 +194,7 @@ func (mdb *MultiDB) RWUnLocks(dbIndex int, writeKeys []string, readKeys []string
db.RWUnLocks(writeKeys, readKeys) db.RWUnLocks(writeKeys, readKeys)
} }
// GetUndoLogs return rollback commands
func (mdb *MultiDB) GetUndoLogs(dbIndex int, cmdLine [][]byte) []CmdLine { func (mdb *MultiDB) GetUndoLogs(dbIndex int, cmdLine [][]byte) []CmdLine {
if dbIndex >= len(mdb.dbSet) { if dbIndex >= len(mdb.dbSet) {
panic("ERR DB index is out of range") panic("ERR DB index is out of range")
@@ -197,6 +203,7 @@ func (mdb *MultiDB) GetUndoLogs(dbIndex int, cmdLine [][]byte) []CmdLine {
return db.GetUndoLogs(cmdLine) return db.GetUndoLogs(cmdLine)
} }
// ExecWithLock executes normal commands, invoker should provide locks
func (mdb *MultiDB) ExecWithLock(conn redis.Connection, cmdLine [][]byte) redis.Reply { func (mdb *MultiDB) ExecWithLock(conn redis.Connection, cmdLine [][]byte) redis.Reply {
if conn.GetDBIndex() >= len(mdb.dbSet) { if conn.GetDBIndex() >= len(mdb.dbSet) {
panic("ERR DB index is out of range") panic("ERR DB index is out of range")
@@ -211,6 +218,7 @@ func BGRewriteAOF(db *MultiDB, args [][]byte) redis.Reply {
return reply.MakeStatusReply("Background append only file rewriting started") return reply.MakeStatusReply("Background append only file rewriting started")
} }
// RewriteAOF start Append-Only-File rewriting and blocked until it finished
func RewriteAOF(db *MultiDB, args [][]byte) redis.Reply { func RewriteAOF(db *MultiDB, args [][]byte) redis.Reply {
db.aofHandler.Rewrite() db.aofHandler.Rewrite()
return reply.MakeStatusReply("Background append only file rewriting started") return reply.MakeStatusReply("Background append only file rewriting started")

View File

@@ -287,6 +287,7 @@ func (dict *ConcurrentDict) RandomDistinctKeys(limit int) []string {
return arr return arr
} }
// Clear removes all keys in dict
func (dict *ConcurrentDict) Clear() { func (dict *ConcurrentDict) Clear() {
*dict = *MakeConcurrent(dict.shardCount) *dict = *MakeConcurrent(dict.shardCount)
} }

View File

@@ -115,6 +115,7 @@ func (dict *SimpleDict) RandomDistinctKeys(limit int) []string {
return result return result
} }
// Clear removes all keys in dict
func (dict *SimpleDict) Clear() { func (dict *SimpleDict) Clear() {
*dict = *MakeSimple() *dict = *MakeSimple()
} }

View File

@@ -5,6 +5,7 @@ import (
"time" "time"
) )
// CmdLine is alias for [][]byte, represents a command line
type CmdLine = [][]byte type CmdLine = [][]byte
// DB is the interface for redis style storage engine // DB is the interface for redis style storage engine

View File

@@ -9,6 +9,7 @@ func ToCmdLine(cmd ...string) [][]byte {
return args return args
} }
// ToCmdLine2 convert commandName and string-type argument to [][]byte
func ToCmdLine2(commandName string, args ...string) [][]byte { func ToCmdLine2(commandName string, args ...string) [][]byte {
result := make([][]byte, len(args)+1) result := make([][]byte, len(args)+1)
result[0] = []byte(commandName) result[0] = []byte(commandName)
@@ -18,6 +19,7 @@ func ToCmdLine2(commandName string, args ...string) [][]byte {
return result return result
} }
// ToCmdLine3 convert commandName and []byte-type argument to CmdLine
func ToCmdLine3(commandName string, args ...[]byte) [][]byte { func ToCmdLine3(commandName string, args ...[]byte) [][]byte {
result := make([][]byte, len(args)+1) result := make([][]byte, len(args)+1)
result[0] = []byte(commandName) result[0] = []byte(commandName)

View File

@@ -27,7 +27,7 @@ func (i *item) contains(c byte) bool {
min uint8 = 255 min uint8 = 255
max uint8 = 0 max uint8 = 0
) )
for k, _ := range i.set { for k := range i.set {
if min > k { if min > k {
min = k min = k
} }

View File

@@ -119,10 +119,12 @@ func (c *Connection) GetPassword() string {
return c.password return c.password
} }
// InMultiState tells is connection in an uncommitted transaction
func (c *Connection) InMultiState() bool { func (c *Connection) InMultiState() bool {
return c.multiState return c.multiState
} }
// SetMultiState sets transaction flag
func (c *Connection) SetMultiState(state bool) { func (c *Connection) SetMultiState(state bool) {
if !state { // reset data when cancel multi if !state { // reset data when cancel multi
c.watching = nil c.watching = nil
@@ -131,18 +133,22 @@ func (c *Connection) SetMultiState(state bool) {
c.multiState = state c.multiState = state
} }
// GetQueuedCmdLine returns queued commands of current transaction
func (c *Connection) GetQueuedCmdLine() [][][]byte { func (c *Connection) GetQueuedCmdLine() [][][]byte {
return c.queue return c.queue
} }
// EnqueueCmd enqueues command of current transaction
func (c *Connection) EnqueueCmd(cmdLine [][]byte) { func (c *Connection) EnqueueCmd(cmdLine [][]byte) {
c.queue = append(c.queue, cmdLine) c.queue = append(c.queue, cmdLine)
} }
// ClearQueuedCmds clears queued commands of current transaction
func (c *Connection) ClearQueuedCmds() { func (c *Connection) ClearQueuedCmds() {
c.queue = nil c.queue = nil
} }
// GetWatching returns watching keys and their version code when started watching
func (c *Connection) GetWatching() map[string]uint32 { func (c *Connection) GetWatching() map[string]uint32 {
if c.watching == nil { if c.watching == nil {
c.watching = make(map[string]uint32) c.watching = make(map[string]uint32)
@@ -150,10 +156,12 @@ func (c *Connection) GetWatching() map[string]uint32 {
return c.watching return c.watching
} }
// GetDBIndex returns selected db
func (c *Connection) GetDBIndex() int { func (c *Connection) GetDBIndex() int {
return c.selectedDB return c.selectedDB
} }
// SelectDB selects a database
func (c *Connection) SelectDB(dbNum int) { func (c *Connection) SelectDB(dbNum int) {
c.selectedDB = dbNum c.selectedDB = dbNum
} }

View File

@@ -22,6 +22,7 @@ func (r *OkReply) ToBytes() []byte {
var theOkReply = new(OkReply) var theOkReply = new(OkReply)
// MakeOkReply returns a ok reply
func MakeOkReply() *OkReply { func MakeOkReply() *OkReply {
return theOkReply return theOkReply
} }
@@ -78,6 +79,7 @@ func (r *QueuedReply) ToBytes() []byte {
var theQueuedReply = new(QueuedReply) var theQueuedReply = new(QueuedReply)
// MakeQueuedReply returns a QUEUED reply
func MakeQueuedReply() *QueuedReply { func MakeQueuedReply() *QueuedReply {
return theQueuedReply return theQueuedReply
} }