refactor dict

This commit is contained in:
hdt3213
2019-12-23 20:15:16 +08:00
committed by wyb
parent a54bac52f3
commit 96b70d16d5
4 changed files with 50 additions and 498 deletions

View File

@@ -7,66 +7,30 @@ import (
) )
type Dict struct { type Dict struct {
table atomic.Value // []*Shard table []*Shard
nextTable []*Shard
nextTableMu sync.Mutex
count int32 count int32
// -1: no rehashing in progress
// >=0 && < tableSize: table[rehashIndex] is rehashing
// >= tableSize: rehashing progress is finishing
rehashIndex int32
} }
type Shard struct { type Shard struct {
head *Node m map[string]interface{}
mutex sync.RWMutex mutex sync.RWMutex
} }
type Node struct {
key string
val interface{}
next *Node
hashCode uint32
}
func Make(shardCount int) *Dict {
const ( if shardCount < 1 {
maxCapacity = 1 << 15 shardCount = 16
minCapacity = 16
rehashConcurrent = 4
loadFactor = 0.75
)
// return the mini 2^n which is not less than cap
func computeCapacity(param int) (size int) {
if param <= minCapacity {
return minCapacity
} }
n := param - 1
n |= n >> 1
n |= n >> 2
n |= n >> 4
n |= n >> 8
n |= n >> 16
if n < 0 || n >= maxCapacity {
return maxCapacity
} else {
return int(n + 1)
}
}
func Make(shardCountHint int) *Dict {
shardCount := computeCapacity(shardCountHint)
table := make([]*Shard, shardCount) table := make([]*Shard, shardCount)
for i := 0; i < shardCount; i++ { for i := 0; i < shardCount; i++ {
table[i] = &Shard{} table[i] = &Shard{
m: make(map[string]interface{}),
}
} }
d := &Dict{ d := &Dict{
count: 0, count: 0,
rehashIndex: -1, table: table,
} }
d.table.Store(table)
return d return d
} }
@@ -85,8 +49,7 @@ func (dict *Dict) spread(hashCode uint32) uint32 {
if dict == nil { if dict == nil {
panic("dict is nil") panic("dict is nil")
} }
table, _ := dict.table.Load().([]*Shard) tableSize := uint32(len(dict.table))
tableSize := uint32(len(table))
return (tableSize - 1) & uint32(hashCode) return (tableSize - 1) & uint32(hashCode)
} }
@@ -94,74 +57,7 @@ func (dict *Dict) getShard(index uint32) *Shard {
if dict == nil { if dict == nil {
panic("dict is nil") panic("dict is nil")
} }
table, ok := dict.table.Load().([]*Shard) return dict.table[index]
if !ok {
panic("load table failed")
}
return table[index]
}
func (dict *Dict) getNextShard(hashCode uint32) *Shard {
if dict == nil {
panic("dict is nil")
}
// in case next table has been released during get shard
dict.nextTableMu.Lock()
defer dict.nextTableMu.Unlock()
// rehashing may be in progress or rehashing has finished while waiting nextTableMu
if dict.nextTable == nil {
return nil
}
nextTableSize := uint32(len(dict.nextTable))
index := (nextTableSize - 1) & uint32(hashCode)
return dict.nextTable[index]
}
func (dict *Dict) ensureNextTable() {
if dict.nextTable == nil {
dict.nextTableMu.Lock()
// check-lock-check
if dict.nextTable == nil {
table, _ := dict.table.Load().([]*Shard)
tableSize := uint32(len(table))
// init next table
nextShardCount := tableSize * 2
if nextShardCount > maxCapacity || nextShardCount < 0 {
nextShardCount = maxCapacity
}
if nextShardCount <= tableSize {
// reach limit, cannot resize
atomic.StoreInt32(&dict.rehashIndex, -1)
return
}
nextTable := make([]*Shard, nextShardCount)
var i uint32
for i = 0; i < nextShardCount; i++ {
nextTable[i] = &Shard{}
}
dict.nextTable = nextTable
}
dict.nextTableMu.Unlock()
}
}
func (shard *Shard) Get(key string) (val interface{}, exists bool) {
if shard == nil {
panic("shard is nil")
}
node := shard.head
for node != nil {
if node.key == key {
return node.val, true
}
node = node.next
}
return nil, false
} }
func (dict *Dict) Get(key string) (val interface{}, exists bool) { func (dict *Dict) Get(key string) (val interface{}, exists bool) {
@@ -172,34 +68,8 @@ func (dict *Dict) Get(key string) (val interface{}, exists bool) {
index := dict.spread(hashCode) index := dict.spread(hashCode)
shard := dict.getShard(index) shard := dict.getShard(index)
shard.mutex.RLock() shard.mutex.RLock()
defer shard.mutex.RUnlock()
rehashIndex := atomic.LoadInt32(&dict.rehashIndex) val, exists = shard.m[key]
if rehashIndex >= int32(index) {
/*
* if rehashIndex > index. then the shard has finished resize, put in next table
* if rehashIndex == index, the shard may be resizing or just finished.
* Resizing will not be finished until the lock has been released
*/
shard.mutex.RUnlock()
nextShard := dict.getNextShard(hashCode)
if nextShard == nil {
shard := dict.getShard(dict.spread(hashCode))
shard.mutex.RLock()
val, exists = shard.Get(key)
shard.mutex.RUnlock()
} else {
nextShard.mutex.RLock()
val, exists = nextShard.Get(key)
nextShard.mutex.RUnlock()
}
} else {
/*
* if rehashing not in progress or the shard has not been rehashing, put in current shard
*/
val, exists = shard.Get(key)
shard.mutex.RUnlock()
}
return return
} }
@@ -210,42 +80,6 @@ func (dict *Dict) Len() int {
return int(atomic.LoadInt32(&dict.count)) return int(atomic.LoadInt32(&dict.count))
} }
func (shard *Shard) Put(key string, val interface{}, hashCode uint32) int {
if shard == nil {
panic("shard is nil")
}
node := shard.head
if node == nil {
// empty shard
node = &Node{
key: key,
val: val,
hashCode: hashCode,
}
shard.head = node
return 1
} else {
for {
if node.key == key {
// existed node
node.val = val
return 0
}
if node.next == nil {
// append
node.next = &Node{
key: key,
val: val,
hashCode: hashCode,
}
return 1
}
node = node.next
}
}
}
// return the number of new inserted key-value // return the number of new inserted key-value
func (dict *Dict) Put(key string, val interface{}) (result int) { func (dict *Dict) Put(key string, val interface{}) (result int) {
if dict == nil { if dict == nil {
@@ -255,67 +89,16 @@ func (dict *Dict) Put(key string, val interface{}) (result int) {
index := dict.spread(hashCode) index := dict.spread(hashCode)
shard := dict.getShard(index) shard := dict.getShard(index)
shard.mutex.Lock() shard.mutex.Lock()
defer shard.mutex.Unlock()
rehashIndex := atomic.LoadInt32(&dict.rehashIndex) if _, ok := shard.m[key]; ok {
if rehashIndex >= int32(index) { shard.m[key] = val
/* if rehashIndex > index. then the shard has finished resize, put in next table
* if rehashIndex == index, the shard may be resizing or just finished.
* Resizing will not be finished until the lock has been released
*/
shard.mutex.Unlock()
nextShard := dict.getNextShard(hashCode)
if nextShard == nil {
shard := dict.getShard(dict.spread(hashCode))
shard.mutex.Lock()
result = shard.Put(key, val, hashCode)
shard.mutex.Unlock()
} else {
nextShard.mutex.Lock()
result = nextShard.Put(key, val, hashCode)
nextShard.mutex.Unlock()
}
} else {
/*
* if rehashing not in progress or the shard has not been rehashing, put in current shard
*/
result = shard.Put(key, val, hashCode)
shard.mutex.Unlock()
}
if result == 1 {
dict.addCount()
}
return result
}
func (shard *Shard) PutIfAbsent(key string, val interface{}, hashCode uint32)int {
node := shard.head
if node == nil {
// empty shard
node = &Node{
key: key,
val: val,
hashCode: hashCode,
}
shard.head = node
return 1
} else {
for {
if node.key == key {
// existed node
return 0 return 0
} } else {
if node.next == nil { shard.m[key] = val
// append dict.addCount()
node.next = &Node{
key: key,
val: val,
hashCode: hashCode,
}
return 1 return 1
} }
node = node.next
}
}
} }
// return the number of updated key-value // return the number of updated key-value
@@ -327,46 +110,18 @@ func (dict *Dict) PutIfAbsent(key string, val interface{}) (result int) {
index := dict.spread(hashCode) index := dict.spread(hashCode)
shard := dict.getShard(index) shard := dict.getShard(index)
shard.mutex.Lock() shard.mutex.Lock()
rehashIndex := atomic.LoadInt32(&dict.rehashIndex) defer shard.mutex.Unlock()
if rehashIndex >= int32(index) {
shard.mutex.Unlock() if _, ok := shard.m[key]; ok {
nextShard := dict.getNextShard(hashCode) return 0
if nextShard == nil {
shard := dict.getShard(dict.spread(hashCode))
shard.mutex.Lock()
result = shard.PutIfAbsent(key, val, hashCode)
shard.mutex.Unlock()
} else { } else {
nextShard.mutex.Lock() shard.m[key] = val
result = nextShard.PutIfAbsent(key, val, hashCode)
nextShard.mutex.Unlock()
}
} else {
result = shard.PutIfAbsent(key, val, hashCode)
shard.mutex.Unlock()
}
if result == 1 {
dict.addCount() dict.addCount()
}
return result
}
func (shard *Shard) PutIfExists(key string, val interface{})int {
if shard == nil {
panic("shard is nil")
}
node := shard.head
for node != nil {
if node.key == key {
node.val = val
return 1 return 1
} }
node = node.next
}
return 0
} }
// return the number of updated key-value // return the number of updated key-value
func (dict *Dict) PutIfExists(key string, val interface{})(result int) { func (dict *Dict) PutIfExists(key string, val interface{})(result int) {
if dict == nil { if dict == nil {
@@ -376,54 +131,15 @@ func (dict *Dict) PutIfExists(key string, val interface{})(result int) {
index := dict.spread(hashCode) index := dict.spread(hashCode)
shard := dict.getShard(index) shard := dict.getShard(index)
shard.mutex.Lock() shard.mutex.Lock()
rehashIndex := atomic.LoadInt32(&dict.rehashIndex) defer shard.mutex.Unlock()
if rehashIndex >= int32(index) {
shard.mutex.Unlock()
nextShard := dict.getNextShard(hashCode)
if nextShard == nil {
shard := dict.getShard(dict.spread(hashCode))
shard.mutex.Lock()
result = shard.PutIfExists(key, val)
shard.mutex.Unlock()
} else {
nextShard.mutex.Lock()
result = nextShard.PutIfExists(key, val)
nextShard.mutex.Unlock()
}
} else {
result = shard.PutIfExists(key, val)
shard.mutex.Unlock()
}
return
}
func (shard *Shard) Remove(key string) int { if _, ok := shard.m[key]; ok {
if shard == nil { shard.m[key] = val
panic("shard is nil")
}
node := shard.head
if node == nil {
// empty shard
return 0
} else if node.key == key {
// remove first node
shard.head = node.next
return 1 return 1
} else { } else {
prev := node
node = node.next
for node != nil {
if node.key == key {
prev.next = node.next
return 1
}
prev = node
node = node.next
}
}
return 0 return 0
} }
}
// return the number of deleted key-value // return the number of deleted key-value
func (dict *Dict) Remove(key string)(result int) { func (dict *Dict) Remove(key string)(result int) {
@@ -434,144 +150,23 @@ func (dict *Dict) Remove(key string)(result int) {
index := dict.spread(hashCode) index := dict.spread(hashCode)
shard := dict.getShard(index) shard := dict.getShard(index)
shard.mutex.Lock() shard.mutex.Lock()
defer shard.mutex.Unlock()
rehashIndex := atomic.LoadInt32(&dict.rehashIndex) if _, ok := shard.m[key]; ok {
if rehashIndex >= int32(index) { delete(shard.m, key)
shard.mutex.Unlock() return 1
nextShard := dict.getNextShard(hashCode)
if nextShard == nil {
shard := dict.getShard(dict.spread(hashCode))
shard.mutex.Lock()
result = shard.Remove(key)
shard.mutex.Unlock()
} else { } else {
nextShard.mutex.Lock() return 0
result = nextShard.Remove(key)
nextShard.mutex.Unlock()
}
} else {
shard := dict.getShard(dict.spread(hashCode))
result = shard.Remove(key)
shard.mutex.Unlock()
}
if result > 0 {
atomic.AddInt32(&dict.count, -1)
} }
return return
} }
func (dict *Dict) addCount() int32 { func (dict *Dict) addCount() int32 {
count := atomic.AddInt32(&dict.count, 1) return atomic.AddInt32(&dict.count, 1)
table, _ := dict.table.Load().([]*Shard)
if float64(count) >= float64(len(table))*loadFactor {
dict.resize()
}
return count
}
func (dict *Dict) resize() {
if !atomic.CompareAndSwapInt32(&dict.rehashIndex, -1, 0) {
// resize already in progress
return
}
dict.ensureNextTable()
var wg sync.WaitGroup
wg.Add(rehashConcurrent)
for i := 0; i < rehashConcurrent; i++ {
go dict.transfer(&wg)
}
wg.Wait()
// finish rehash
dict.nextTableMu.Lock()
dict.table.Store(dict.nextTable)
dict.nextTable = nil
atomic.StoreInt32(&dict.rehashIndex, -1)
dict.nextTableMu.Unlock()
}
func (dict *Dict) transfer(wg *sync.WaitGroup) {
table, _ := dict.table.Load().([]*Shard)
tableSize := uint32(len(table))
// dict.rehashIndex must >= 0
for {
i := uint32(atomic.AddInt32(&dict.rehashIndex, 1)) - 1
if i >= tableSize {
wg.Done()
return
}
shard := dict.getShard(i)
shard.mutex.RLock()
nextShard0 := dict.nextTable[i]
nextShard1 := dict.nextTable[i+tableSize]
nextShard0.mutex.RLock()
nextShard1.mutex.RLock()
var head0, head1 *Node
var tail0, tail1 *Node
node := shard.head
for node != nil {
// split current shard to 2 shards in next table
if node.hashCode&tableSize == 0 {
if head0 == nil {
head0 = node
} else {
tail0.next = node
}
tail0 = node
} else {
if head1 == nil {
head1 = node
} else {
tail1.next = node
}
tail1 = node
}
node = node.next
}
if tail0 != nil {
tail0.next = nil
nextShard0.head = head0
}
if tail1 != nil {
tail1.next = nil
nextShard1.head = head1
}
nextShard1.mutex.RUnlock()
nextShard0.mutex.RUnlock()
shard.mutex.RUnlock()
}
} }
type Consumer func(key string, val interface{})bool type Consumer func(key string, val interface{})bool
func (shard *Shard) ForEach(consumer Consumer)bool {
if shard == nil {
panic("shard is nil")
}
shard.mutex.RLock()
defer shard.mutex.RUnlock()
node := shard.head
for node != nil {
toContinue := consumer(node.key, node.val)
if !toContinue {
return false
}
node = node.next
}
return true
}
/* /*
* may not contains new entry inserted during traversal * may not contains new entry inserted during traversal
*/ */
@@ -579,43 +174,14 @@ func (dict *Dict)ForEach(consumer Consumer) {
if dict == nil { if dict == nil {
panic("dict is nil") panic("dict is nil")
} }
table, ok := dict.table.Load().([]*Shard)
if !ok {
panic("load table failed")
}
var rehashIndex int32 for _, shard := range dict.table {
tableSize := len(table) for key, value := range shard.m {
for index, shard := range table { shard.mutex.RLock()
rehashIndex = atomic.LoadInt32(&dict.rehashIndex) continues := consumer(key, value)
if rehashIndex >= int32(index) { shard.mutex.RUnlock()
// current slot has rehashed if !continues {
if dict.nextTable == nil { return
// rehash has finished, traver current table
// local variable `table` will not change to nextTable
if !shard.ForEach(consumer) {
break
}
}
i0 := index
nextShard0 := dict.nextTable[i0]
if nextShard0 != nil {
if !nextShard0.ForEach(consumer) {
break
}
}
i1 := index + tableSize
nextShard1 := dict.nextTable[i1]
if nextShard1 != nil {
if !nextShard1.ForEach(consumer) {
break
}
}
} else {
if !shard.ForEach(consumer) {
break
} }
} }
} }
@@ -643,30 +209,18 @@ func (shard *Shard)RandomKey()string {
shard.mutex.RLock() shard.mutex.RLock()
defer shard.mutex.RUnlock() defer shard.mutex.RUnlock()
keys := make([]string, 0) for key := range shard.m {
i := 0 return key
node := shard.head
for node != nil {
if node.key != "" {
keys = append(keys, node.key)
i++
} }
node = node.next
}
if i > 0 {
return keys[rand.Intn(i)]
} else {
return "" return ""
} }
}
func (dict *Dict)RandomKeys(limit int)[]string { func (dict *Dict)RandomKeys(limit int)[]string {
size := dict.Len() size := dict.Len()
if limit >= size { if limit >= size {
return dict.Keys() return dict.Keys()
} }
table, _ := dict.table.Load().([]*Shard) shardCount := len(dict.table)
shardCount := len(table)
result := make([]string, limit) result := make([]string, limit)
for i := 0; i < limit; { for i := 0; i < limit; {
@@ -689,8 +243,7 @@ func (dict *Dict)RandomDistinctKeys(limit int)[]string {
return dict.Keys() return dict.Keys()
} }
table, _ := dict.table.Load().([]*Shard) shardCount := len(dict.table)
shardCount := len(table)
result := make(map[string]bool) result := make(map[string]bool)
for len(result) < limit { for len(result) < limit {
shardIndex := uint32(rand.Intn(shardCount)) shardIndex := uint32(rand.Intn(shardCount))

View File

@@ -67,7 +67,6 @@ func (locks *Locks)UnLock(key string) {
func (locks *Locks)RUnLock(key string) { func (locks *Locks)RUnLock(key string) {
index := locks.spread(fnv32(key)) index := locks.spread(fnv32(key))
mu := locks.table[index] mu := locks.table[index]
mu.Lock()
mu.RUnlock() mu.RUnlock()
} }

View File

@@ -46,12 +46,12 @@ var router = MakeRouter()
func MakeDB() *DB { func MakeDB() *DB {
db := &DB{ db := &DB{
Data: dict.Make(1024), Data: dict.Make(128),
TTLMap: dict.Make(128), TTLMap: dict.Make(64),
Locker: lock.Make(128), Locker: lock.Make(128),
interval: 5 * time.Second, interval: 5 * time.Second,
subs: dict.Make(16), subs: dict.Make(4),
subsLocker: lock.Make(16), subsLocker: lock.Make(16),
} }
db.TimerTask() db.TimerTask()

View File

@@ -27,7 +27,7 @@ func (db *DB) getOrInitDict(key string)(dict *Dict.Dict, inited bool, errReply r
} }
inited = false inited = false
if dict == nil { if dict == nil {
dict = Dict.Make(0) dict = Dict.Make(1)
db.Data.Put(key, &DataEntity{ db.Data.Put(key, &DataEntity{
Data: dict, Data: dict,
}) })