diff --git a/src/datastruct/dict/dict.go b/src/datastruct/dict/dict.go index eef2463..ccd6c65 100644 --- a/src/datastruct/dict/dict.go +++ b/src/datastruct/dict/dict.go @@ -7,66 +7,30 @@ import ( ) type Dict struct { - table atomic.Value // []*Shard - nextTable []*Shard - nextTableMu sync.Mutex + table []*Shard count int32 - - // -1: no rehashing in progress - // >=0 && < tableSize: table[rehashIndex] is rehashing - // >= tableSize: rehashing progress is finishing - rehashIndex int32 } type Shard struct { - head *Node + m map[string]interface{} mutex sync.RWMutex } -type Node struct { - key string - val interface{} - next *Node - hashCode uint32 -} - -const ( - maxCapacity = 1 << 15 - minCapacity = 16 - rehashConcurrent = 4 - loadFactor = 0.75 -) - -// return the mini 2^n which is not less than cap -func computeCapacity(param int) (size int) { - if param <= minCapacity { - return minCapacity +func Make(shardCount int) *Dict { + if shardCount < 1 { + shardCount = 16 } - n := param - 1 - n |= n >> 1 - n |= n >> 2 - n |= n >> 4 - n |= n >> 8 - n |= n >> 16 - if n < 0 || n >= maxCapacity { - return maxCapacity - } else { - return int(n + 1) - } -} - -func Make(shardCountHint int) *Dict { - shardCount := computeCapacity(shardCountHint) table := make([]*Shard, shardCount) for i := 0; i < shardCount; i++ { - table[i] = &Shard{} + table[i] = &Shard{ + m: make(map[string]interface{}), + } } d := &Dict{ count: 0, - rehashIndex: -1, + table: table, } - d.table.Store(table) return d } @@ -85,8 +49,7 @@ func (dict *Dict) spread(hashCode uint32) uint32 { if dict == nil { panic("dict is nil") } - table, _ := dict.table.Load().([]*Shard) - tableSize := uint32(len(table)) + tableSize := uint32(len(dict.table)) return (tableSize - 1) & uint32(hashCode) } @@ -94,74 +57,7 @@ func (dict *Dict) getShard(index uint32) *Shard { if dict == nil { panic("dict is nil") } - table, ok := dict.table.Load().([]*Shard) - if !ok { - panic("load table failed") - } - return table[index] -} - -func (dict *Dict) getNextShard(hashCode uint32) *Shard { - if dict == nil { - panic("dict is nil") - } - - // in case next table has been released during get shard - dict.nextTableMu.Lock() - defer dict.nextTableMu.Unlock() - - // rehashing may be in progress or rehashing has finished while waiting nextTableMu - if dict.nextTable == nil { - return nil - } - nextTableSize := uint32(len(dict.nextTable)) - index := (nextTableSize - 1) & uint32(hashCode) - return dict.nextTable[index] -} - -func (dict *Dict) ensureNextTable() { - if dict.nextTable == nil { - dict.nextTableMu.Lock() - - // check-lock-check - if dict.nextTable == nil { - table, _ := dict.table.Load().([]*Shard) - tableSize := uint32(len(table)) - // init next table - nextShardCount := tableSize * 2 - if nextShardCount > maxCapacity || nextShardCount < 0 { - nextShardCount = maxCapacity - } - if nextShardCount <= tableSize { - // reach limit, cannot resize - atomic.StoreInt32(&dict.rehashIndex, -1) - return - } - nextTable := make([]*Shard, nextShardCount) - var i uint32 - for i = 0; i < nextShardCount; i++ { - nextTable[i] = &Shard{} - } - dict.nextTable = nextTable - } - - dict.nextTableMu.Unlock() - } -} - -func (shard *Shard) Get(key string) (val interface{}, exists bool) { - if shard == nil { - panic("shard is nil") - } - - node := shard.head - for node != nil { - if node.key == key { - return node.val, true - } - node = node.next - } - return nil, false + return dict.table[index] } func (dict *Dict) Get(key string) (val interface{}, exists bool) { @@ -172,34 +68,8 @@ func (dict *Dict) Get(key string) (val interface{}, exists bool) { index := dict.spread(hashCode) shard := dict.getShard(index) shard.mutex.RLock() - - rehashIndex := atomic.LoadInt32(&dict.rehashIndex) - if rehashIndex >= int32(index) { - /* - * if rehashIndex > index. then the shard has finished resize, put in next table - * if rehashIndex == index, the shard may be resizing or just finished. - * Resizing will not be finished until the lock has been released - */ - shard.mutex.RUnlock() - nextShard := dict.getNextShard(hashCode) - if nextShard == nil { - shard := dict.getShard(dict.spread(hashCode)) - shard.mutex.RLock() - val, exists = shard.Get(key) - shard.mutex.RUnlock() - } else { - nextShard.mutex.RLock() - val, exists = nextShard.Get(key) - nextShard.mutex.RUnlock() - } - } else { - /* - * if rehashing not in progress or the shard has not been rehashing, put in current shard - */ - val, exists = shard.Get(key) - shard.mutex.RUnlock() - } - + defer shard.mutex.RUnlock() + val, exists = shard.m[key] return } @@ -210,42 +80,6 @@ func (dict *Dict) Len() int { return int(atomic.LoadInt32(&dict.count)) } -func (shard *Shard) Put(key string, val interface{}, hashCode uint32) int { - if shard == nil { - panic("shard is nil") - } - - node := shard.head - if node == nil { - // empty shard - node = &Node{ - key: key, - val: val, - hashCode: hashCode, - } - shard.head = node - return 1 - } else { - for { - if node.key == key { - // existed node - node.val = val - return 0 - } - if node.next == nil { - // append - node.next = &Node{ - key: key, - val: val, - hashCode: hashCode, - } - return 1 - } - node = node.next - } - } -} - // return the number of new inserted key-value func (dict *Dict) Put(key string, val interface{}) (result int) { if dict == nil { @@ -255,66 +89,15 @@ func (dict *Dict) Put(key string, val interface{}) (result int) { index := dict.spread(hashCode) shard := dict.getShard(index) shard.mutex.Lock() + defer shard.mutex.Unlock() - rehashIndex := atomic.LoadInt32(&dict.rehashIndex) - if rehashIndex >= int32(index) { - /* if rehashIndex > index. then the shard has finished resize, put in next table - * if rehashIndex == index, the shard may be resizing or just finished. - * Resizing will not be finished until the lock has been released - */ - shard.mutex.Unlock() - nextShard := dict.getNextShard(hashCode) - if nextShard == nil { - shard := dict.getShard(dict.spread(hashCode)) - shard.mutex.Lock() - result = shard.Put(key, val, hashCode) - shard.mutex.Unlock() - } else { - nextShard.mutex.Lock() - result = nextShard.Put(key, val, hashCode) - nextShard.mutex.Unlock() - } + if _, ok := shard.m[key]; ok { + shard.m[key] = val + return 0 } else { - /* - * if rehashing not in progress or the shard has not been rehashing, put in current shard - */ - result = shard.Put(key, val, hashCode) - shard.mutex.Unlock() - } - if result == 1 { + shard.m[key] = val dict.addCount() - } - return result -} - -func (shard *Shard) PutIfAbsent(key string, val interface{}, hashCode uint32)int { - node := shard.head - if node == nil { - // empty shard - node = &Node{ - key: key, - val: val, - hashCode: hashCode, - } - shard.head = node return 1 - } else { - for { - if node.key == key { - // existed node - return 0 - } - if node.next == nil { - // append - node.next = &Node{ - key: key, - val: val, - hashCode: hashCode, - } - return 1 - } - node = node.next - } } } @@ -327,45 +110,17 @@ func (dict *Dict) PutIfAbsent(key string, val interface{}) (result int) { index := dict.spread(hashCode) shard := dict.getShard(index) shard.mutex.Lock() - rehashIndex := atomic.LoadInt32(&dict.rehashIndex) - if rehashIndex >= int32(index) { - shard.mutex.Unlock() - nextShard := dict.getNextShard(hashCode) - if nextShard == nil { - shard := dict.getShard(dict.spread(hashCode)) - shard.mutex.Lock() - result = shard.PutIfAbsent(key, val, hashCode) - shard.mutex.Unlock() - } else { - nextShard.mutex.Lock() - result = nextShard.PutIfAbsent(key, val, hashCode) - nextShard.mutex.Unlock() - } + defer shard.mutex.Unlock() + + if _, ok := shard.m[key]; ok { + return 0 } else { - result = shard.PutIfAbsent(key, val, hashCode) - shard.mutex.Unlock() - } - if result == 1 { + shard.m[key] = val dict.addCount() + return 1 } - return result } -func (shard *Shard) PutIfExists(key string, val interface{})int { - if shard == nil { - panic("shard is nil") - } - - node := shard.head - for node != nil { - if node.key == key { - node.val = val - return 1 - } - node = node.next - } - return 0 -} // return the number of updated key-value func (dict *Dict) PutIfExists(key string, val interface{})(result int) { @@ -376,53 +131,14 @@ func (dict *Dict) PutIfExists(key string, val interface{})(result int) { index := dict.spread(hashCode) shard := dict.getShard(index) shard.mutex.Lock() - rehashIndex := atomic.LoadInt32(&dict.rehashIndex) - if rehashIndex >= int32(index) { - shard.mutex.Unlock() - nextShard := dict.getNextShard(hashCode) - if nextShard == nil { - shard := dict.getShard(dict.spread(hashCode)) - shard.mutex.Lock() - result = shard.PutIfExists(key, val) - shard.mutex.Unlock() - } else { - nextShard.mutex.Lock() - result = nextShard.PutIfExists(key, val) - nextShard.mutex.Unlock() - } - } else { - result = shard.PutIfExists(key, val) - shard.mutex.Unlock() - } - return -} + defer shard.mutex.Unlock() -func (shard *Shard) Remove(key string) int { - if shard == nil { - panic("shard is nil") - } - - node := shard.head - if node == nil { - // empty shard - return 0 - } else if node.key == key { - // remove first node - shard.head = node.next + if _, ok := shard.m[key]; ok { + shard.m[key] = val return 1 } else { - prev := node - node = node.next - for node != nil { - if node.key == key { - prev.next = node.next - return 1 - } - prev = node - node = node.next - } + return 0 } - return 0 } // return the number of deleted key-value @@ -434,144 +150,23 @@ func (dict *Dict) Remove(key string)(result int) { index := dict.spread(hashCode) shard := dict.getShard(index) shard.mutex.Lock() + defer shard.mutex.Unlock() - rehashIndex := atomic.LoadInt32(&dict.rehashIndex) - if rehashIndex >= int32(index) { - shard.mutex.Unlock() - nextShard := dict.getNextShard(hashCode) - if nextShard == nil { - shard := dict.getShard(dict.spread(hashCode)) - shard.mutex.Lock() - result = shard.Remove(key) - shard.mutex.Unlock() - } else { - nextShard.mutex.Lock() - result = nextShard.Remove(key) - nextShard.mutex.Unlock() - } + if _, ok := shard.m[key]; ok { + delete(shard.m, key) + return 1 } else { - shard := dict.getShard(dict.spread(hashCode)) - result = shard.Remove(key) - shard.mutex.Unlock() - } - if result > 0 { - atomic.AddInt32(&dict.count, -1) + return 0 } return } func (dict *Dict) addCount() int32 { - count := atomic.AddInt32(&dict.count, 1) - table, _ := dict.table.Load().([]*Shard) - if float64(count) >= float64(len(table))*loadFactor { - dict.resize() - } - return count -} - -func (dict *Dict) resize() { - if !atomic.CompareAndSwapInt32(&dict.rehashIndex, -1, 0) { - // resize already in progress - return - } - dict.ensureNextTable() - - var wg sync.WaitGroup - wg.Add(rehashConcurrent) - for i := 0; i < rehashConcurrent; i++ { - go dict.transfer(&wg) - } - wg.Wait() - - // finish rehash - dict.nextTableMu.Lock() - dict.table.Store(dict.nextTable) - dict.nextTable = nil - atomic.StoreInt32(&dict.rehashIndex, -1) - dict.nextTableMu.Unlock() -} - -func (dict *Dict) transfer(wg *sync.WaitGroup) { - table, _ := dict.table.Load().([]*Shard) - tableSize := uint32(len(table)) - // dict.rehashIndex must >= 0 - for { - i := uint32(atomic.AddInt32(&dict.rehashIndex, 1)) - 1 - if i >= tableSize { - wg.Done() - return - } - shard := dict.getShard(i) - shard.mutex.RLock() - - nextShard0 := dict.nextTable[i] - nextShard1 := dict.nextTable[i+tableSize] - - nextShard0.mutex.RLock() - nextShard1.mutex.RLock() - - var head0, head1 *Node - var tail0, tail1 *Node - node := shard.head - for node != nil { - // split current shard to 2 shards in next table - if node.hashCode&tableSize == 0 { - if head0 == nil { - head0 = node - } else { - tail0.next = node - } - tail0 = node - } else { - if head1 == nil { - head1 = node - } else { - tail1.next = node - } - tail1 = node - } - node = node.next - } - - if tail0 != nil { - tail0.next = nil - - nextShard0.head = head0 - } - - if tail1 != nil { - tail1.next = nil - - nextShard1.head = head1 - } - - nextShard1.mutex.RUnlock() - nextShard0.mutex.RUnlock() - shard.mutex.RUnlock() - } + return atomic.AddInt32(&dict.count, 1) } type Consumer func(key string, val interface{})bool -func (shard *Shard) ForEach(consumer Consumer)bool { - if shard == nil { - panic("shard is nil") - } - shard.mutex.RLock() - defer shard.mutex.RUnlock() - - node := shard.head - for node != nil { - toContinue := consumer(node.key, node.val) - if !toContinue { - return false - } - node = node.next - } - return true -} - - /* * may not contains new entry inserted during traversal */ @@ -579,43 +174,14 @@ func (dict *Dict)ForEach(consumer Consumer) { if dict == nil { panic("dict is nil") } - table, ok := dict.table.Load().([]*Shard) - if !ok { - panic("load table failed") - } - var rehashIndex int32 - tableSize := len(table) - for index, shard := range table { - rehashIndex = atomic.LoadInt32(&dict.rehashIndex) - if rehashIndex >= int32(index) { - // current slot has rehashed - if dict.nextTable == nil { - // rehash has finished, traver current table - // local variable `table` will not change to nextTable - if !shard.ForEach(consumer) { - break - } - } - - i0 := index - nextShard0 := dict.nextTable[i0] - if nextShard0 != nil { - if !nextShard0.ForEach(consumer) { - break - } - } - - i1 := index + tableSize - nextShard1 := dict.nextTable[i1] - if nextShard1 != nil { - if !nextShard1.ForEach(consumer) { - break - } - } - } else { - if !shard.ForEach(consumer) { - break + for _, shard := range dict.table { + for key, value := range shard.m { + shard.mutex.RLock() + continues := consumer(key, value) + shard.mutex.RUnlock() + if !continues { + return } } } @@ -643,21 +209,10 @@ func (shard *Shard)RandomKey()string { shard.mutex.RLock() defer shard.mutex.RUnlock() - keys := make([]string, 0) - i := 0 - node := shard.head - for node != nil { - if node.key != "" { - keys = append(keys, node.key) - i++ - } - node = node.next - } - if i > 0 { - return keys[rand.Intn(i)] - } else { - return "" + for key := range shard.m { + return key } + return "" } func (dict *Dict)RandomKeys(limit int)[]string { @@ -665,8 +220,7 @@ func (dict *Dict)RandomKeys(limit int)[]string { if limit >= size { return dict.Keys() } - table, _ := dict.table.Load().([]*Shard) - shardCount := len(table) + shardCount := len(dict.table) result := make([]string, limit) for i := 0; i < limit; { @@ -689,8 +243,7 @@ func (dict *Dict)RandomDistinctKeys(limit int)[]string { return dict.Keys() } - table, _ := dict.table.Load().([]*Shard) - shardCount := len(table) + shardCount := len(dict.table) result := make(map[string]bool) for len(result) < limit { shardIndex := uint32(rand.Intn(shardCount)) diff --git a/src/datastruct/lock/lock_map.go b/src/datastruct/lock/lock_map.go index 09100fc..f927dda 100644 --- a/src/datastruct/lock/lock_map.go +++ b/src/datastruct/lock/lock_map.go @@ -67,7 +67,6 @@ func (locks *Locks)UnLock(key string) { func (locks *Locks)RUnLock(key string) { index := locks.spread(fnv32(key)) mu := locks.table[index] - mu.Lock() mu.RUnlock() } diff --git a/src/db/db.go b/src/db/db.go index 5a011d9..07510e7 100644 --- a/src/db/db.go +++ b/src/db/db.go @@ -46,12 +46,12 @@ var router = MakeRouter() func MakeDB() *DB { db := &DB{ - Data: dict.Make(1024), - TTLMap: dict.Make(128), + Data: dict.Make(128), + TTLMap: dict.Make(64), Locker: lock.Make(128), interval: 5 * time.Second, - subs: dict.Make(16), + subs: dict.Make(4), subsLocker: lock.Make(16), } db.TimerTask() diff --git a/src/db/hash.go b/src/db/hash.go index 32af90a..031d080 100644 --- a/src/db/hash.go +++ b/src/db/hash.go @@ -27,7 +27,7 @@ func (db *DB) getOrInitDict(key string)(dict *Dict.Dict, inited bool, errReply r } inited = false if dict == nil { - dict = Dict.Make(0) + dict = Dict.Make(1) db.Data.Put(key, &DataEntity{ Data: dict, })