From b09dc0b99952924d855c0803857dab64cfcfdf7a Mon Sep 17 00:00:00 2001 From: impact-eintr Date: Sun, 13 Mar 2022 18:14:34 +0800 Subject: [PATCH] =?UTF-8?q?KV=20=E5=AD=98=E5=82=A8=E4=B9=8B=E5=90=8E?= =?UTF-8?q?=E5=86=8D=E8=AE=BE=E8=AE=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- store/kv.go | 83 +++++++++++------------------------------------- store/kv_test.go | 13 +------- 2 files changed, 19 insertions(+), 77 deletions(-) diff --git a/store/kv.go b/store/kv.go index 3443e13..622e2e2 100644 --- a/store/kv.go +++ b/store/kv.go @@ -2,16 +2,23 @@ package store import ( "encoding/binary" + "errors" "fmt" - - "github.com/vmihailenco/msgpack/v5" ) +// 目前是使用 ValuePairs 保存 key value pair 的 然后使用 json 的序列化到 lsmdb中 +// 之后说不定会优化这个数据存储机制 + const ( IndexSize = 5 * 4 // ko+ks+vo+vs+ttl BlockIndexSize = 4 + 1024*IndexSize // currOffset + 1024*(ko+ks+vo+vs+ttl) ) +var ( + ErrInvalidKV = errors.New("Invalid Key Value Pair!") + ErrEmptyBucket = errors.New("A empty Bucket!") +) + // [co][ko ks vo vs ttl ...][nbo][k v k v ...] [co][ko ks vo vs ttl ...][nbo][k v k v ...]... // 会返回填充过的 kvb 小心使用 @@ -65,62 +72,8 @@ func appendKV(kvb, k, v []byte, ttl uint32) []byte { return kvb } -// TODO 性能现在太差了 之后需要优化 用一下 XMM ? -func addKV(kvb, k, v []byte) (out []byte) { - if len(k) == 0 || len(v) == 0 { - return kvb - } - var m = make(map[string][]byte) - var err error - - if len(kvb) > 0 { - err = msgpack.Unmarshal(kvb, &m) - if err != nil { - panic(err) - } - } - - m[string(k)] = v - - out, err = msgpack.Marshal(m) - if err != nil { - panic(err) - } - - return out -} - -func delKV(kvb, k []byte) (out []byte) { - var m = make(map[string][]byte) - err := msgpack.Unmarshal(kvb, &m) - if err != nil { - panic(err) - } - - delete(m, string(k)) - - out, err = msgpack.Marshal(m) - if err != nil { - panic(err) - } - - return out -} - -// 遍历 key-value pair -func traverseKV(kvb []byte, cb func([]byte, []byte) error) { - var m = make(map[string][]byte) - err := msgpack.Unmarshal(kvb, &m) - if err != nil { - panic(err) - } - - for k := range m { - cb([]byte(k), m[k]) - } -} - -func rangeKV(kvb []byte, cb func([]byte, []byte) error) { +// 只读遍历 KVB +func rangeKV(kvb []byte, cb func([]byte, []byte, uint32) error) { if len(kvb) == 0 { return } @@ -131,14 +84,14 @@ func rangeKV(kvb []byte, cb func([]byte, []byte) error) { } }() - decode := func(buf []byte) ([]byte, []byte) { + decode := func(buf []byte) ([]byte, []byte, uint32) { // 内层小循环是段内遍历 ko := binary.BigEndian.Uint32(buf[:]) // keyOffset kl := binary.BigEndian.Uint32(buf[4:]) // keyLen vo := binary.BigEndian.Uint32(buf[8:]) // valueOffset vl := binary.BigEndian.Uint32(buf[12:]) // valueLen - //ttl := binary.BigEndian.Uint32(buf[16:]) - return kvb[ko : ko+kl], kvb[vo : vo+vl] + ttl := binary.BigEndian.Uint32(buf[16:]) + return kvb[ko : ko+kl], kvb[vo : vo+vl], ttl } buf := kvb @@ -149,17 +102,17 @@ func rangeKV(kvb []byte, cb func([]byte, []byte) error) { // 未写满的一块数据 buf = buf[4:] // 定位到当前读写指针处 for i := 4; i < int(curOffset); i += IndexSize { - key, value := decode(buf) + key, value, ttl := decode(buf) buf = buf[IndexSize:] // 移动指针 - cb(key, value) + cb(key, value, ttl) } break } else if curOffset == BlockIndexSize { // 写满的一块数据 buf = buf[4:] // 定位到当前读写指针处 for i := 4; i < BlockIndexSize; i += IndexSize { - key, value := decode(buf) + key, value, ttl := decode(buf) if i != BlockIndexSize-IndexSize { buf = buf[IndexSize:] // 移动指针 @@ -168,7 +121,7 @@ func rangeKV(kvb []byte, cb func([]byte, []byte) error) { nbo := binary.BigEndian.Uint32(buf[:4]) buf = kvb[nbo:] } - cb(key, value) + cb(key, value, ttl) } } } diff --git a/store/kv_test.go b/store/kv_test.go index bc818bc..db652a6 100644 --- a/store/kv_test.go +++ b/store/kv_test.go @@ -6,24 +6,13 @@ import ( "testing" ) -//func TestTraverseKV(t *testing.T) { -// var kvb []byte -// for i := 0; i < 1000; i++ { -// kvb = addKV(kvb, []byte(fmt.Sprintf("key%d", i)), []byte(fmt.Sprintf("value%d", i))) -// } -// //traverseKV(kvb, func(k []byte, v []byte) error { -// // log.Println(string(k), string(v)) -// // return nil -// //}) -//} - func TestAppendKV(t *testing.T) { var kvb []byte for i := 0; i < 20000; i++ { kvb = appendKV(kvb, []byte(fmt.Sprintf("key%d", i)), []byte(fmt.Sprintf("value%d", i)), 1) } - rangeKV(kvb, func(k []byte, v []byte) error { + rangeKV(kvb, func(k []byte, v []byte, ttl uint32) error { log.Println(string(k), string(v)) return nil })