mirror of
https://github.com/impact-eintr/raftd.git
synced 2025-12-24 13:07:54 +08:00
KV 存储之后再设计
This commit is contained in:
83
store/kv.go
83
store/kv.go
@@ -2,16 +2,23 @@ package store
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/vmihailenco/msgpack/v5"
|
||||
)
|
||||
|
||||
// 目前是使用 ValuePairs 保存 key value pair 的 然后使用 json 的序列化到 lsmdb中
|
||||
// 之后说不定会优化这个数据存储机制
|
||||
|
||||
const (
|
||||
IndexSize = 5 * 4 // ko+ks+vo+vs+ttl
|
||||
BlockIndexSize = 4 + 1024*IndexSize // currOffset + 1024*(ko+ks+vo+vs+ttl)
|
||||
)
|
||||
|
||||
var (
|
||||
ErrInvalidKV = errors.New("Invalid Key Value Pair!")
|
||||
ErrEmptyBucket = errors.New("A empty Bucket!")
|
||||
)
|
||||
|
||||
// [co][ko ks vo vs ttl ...][nbo][k v k v ...] [co][ko ks vo vs ttl ...][nbo][k v k v ...]...
|
||||
|
||||
// 会返回填充过的 kvb 小心使用
|
||||
@@ -65,62 +72,8 @@ func appendKV(kvb, k, v []byte, ttl uint32) []byte {
|
||||
return kvb
|
||||
}
|
||||
|
||||
// TODO 性能现在太差了 之后需要优化 用一下 XMM ?
|
||||
func addKV(kvb, k, v []byte) (out []byte) {
|
||||
if len(k) == 0 || len(v) == 0 {
|
||||
return kvb
|
||||
}
|
||||
var m = make(map[string][]byte)
|
||||
var err error
|
||||
|
||||
if len(kvb) > 0 {
|
||||
err = msgpack.Unmarshal(kvb, &m)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
m[string(k)] = v
|
||||
|
||||
out, err = msgpack.Marshal(m)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
func delKV(kvb, k []byte) (out []byte) {
|
||||
var m = make(map[string][]byte)
|
||||
err := msgpack.Unmarshal(kvb, &m)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
delete(m, string(k))
|
||||
|
||||
out, err = msgpack.Marshal(m)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
// 遍历 key-value pair
|
||||
func traverseKV(kvb []byte, cb func([]byte, []byte) error) {
|
||||
var m = make(map[string][]byte)
|
||||
err := msgpack.Unmarshal(kvb, &m)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
for k := range m {
|
||||
cb([]byte(k), m[k])
|
||||
}
|
||||
}
|
||||
|
||||
func rangeKV(kvb []byte, cb func([]byte, []byte) error) {
|
||||
// 只读遍历 KVB
|
||||
func rangeKV(kvb []byte, cb func([]byte, []byte, uint32) error) {
|
||||
if len(kvb) == 0 {
|
||||
return
|
||||
}
|
||||
@@ -131,14 +84,14 @@ func rangeKV(kvb []byte, cb func([]byte, []byte) error) {
|
||||
}
|
||||
}()
|
||||
|
||||
decode := func(buf []byte) ([]byte, []byte) {
|
||||
decode := func(buf []byte) ([]byte, []byte, uint32) {
|
||||
// 内层小循环是段内遍历
|
||||
ko := binary.BigEndian.Uint32(buf[:]) // keyOffset
|
||||
kl := binary.BigEndian.Uint32(buf[4:]) // keyLen
|
||||
vo := binary.BigEndian.Uint32(buf[8:]) // valueOffset
|
||||
vl := binary.BigEndian.Uint32(buf[12:]) // valueLen
|
||||
//ttl := binary.BigEndian.Uint32(buf[16:])
|
||||
return kvb[ko : ko+kl], kvb[vo : vo+vl]
|
||||
ttl := binary.BigEndian.Uint32(buf[16:])
|
||||
return kvb[ko : ko+kl], kvb[vo : vo+vl], ttl
|
||||
}
|
||||
|
||||
buf := kvb
|
||||
@@ -149,17 +102,17 @@ func rangeKV(kvb []byte, cb func([]byte, []byte) error) {
|
||||
// 未写满的一块数据
|
||||
buf = buf[4:] // 定位到当前读写指针处
|
||||
for i := 4; i < int(curOffset); i += IndexSize {
|
||||
key, value := decode(buf)
|
||||
key, value, ttl := decode(buf)
|
||||
|
||||
buf = buf[IndexSize:] // 移动指针
|
||||
cb(key, value)
|
||||
cb(key, value, ttl)
|
||||
}
|
||||
break
|
||||
} else if curOffset == BlockIndexSize {
|
||||
// 写满的一块数据
|
||||
buf = buf[4:] // 定位到当前读写指针处
|
||||
for i := 4; i < BlockIndexSize; i += IndexSize {
|
||||
key, value := decode(buf)
|
||||
key, value, ttl := decode(buf)
|
||||
|
||||
if i != BlockIndexSize-IndexSize {
|
||||
buf = buf[IndexSize:] // 移动指针
|
||||
@@ -168,7 +121,7 @@ func rangeKV(kvb []byte, cb func([]byte, []byte) error) {
|
||||
nbo := binary.BigEndian.Uint32(buf[:4])
|
||||
buf = kvb[nbo:]
|
||||
}
|
||||
cb(key, value)
|
||||
cb(key, value, ttl)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,24 +6,13 @@ import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
//func TestTraverseKV(t *testing.T) {
|
||||
// var kvb []byte
|
||||
// for i := 0; i < 1000; i++ {
|
||||
// kvb = addKV(kvb, []byte(fmt.Sprintf("key%d", i)), []byte(fmt.Sprintf("value%d", i)))
|
||||
// }
|
||||
// //traverseKV(kvb, func(k []byte, v []byte) error {
|
||||
// // log.Println(string(k), string(v))
|
||||
// // return nil
|
||||
// //})
|
||||
//}
|
||||
|
||||
func TestAppendKV(t *testing.T) {
|
||||
var kvb []byte
|
||||
for i := 0; i < 20000; i++ {
|
||||
kvb = appendKV(kvb, []byte(fmt.Sprintf("key%d", i)), []byte(fmt.Sprintf("value%d", i)), 1)
|
||||
}
|
||||
|
||||
rangeKV(kvb, func(k []byte, v []byte) error {
|
||||
rangeKV(kvb, func(k []byte, v []byte, ttl uint32) error {
|
||||
log.Println(string(k), string(v))
|
||||
return nil
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user