From 3c67facf29377fefbe5eb49132676f795b3e2eca Mon Sep 17 00:00:00 2001 From: Nicolas JUHEL Date: Mon, 12 Apr 2021 18:06:35 +0200 Subject: [PATCH] Add package NutsDB : - use lib nutsdb (https://github.com/xujiajun/nutsdb) - implement client to use embedded db cluster - convert nutsdb lib as clustered nutsdb with lib cluster (based on dragonboat) - implement disk backend lib for dragonboat - add complete config with nutsdb, node & cluster with validator --- nutsdb/Command.go | 232 +++++++ nutsdb/client.go | 1464 +++++++++++++++++++++++++++++++++++++++++++ nutsdb/cmdCode.go | 420 +++++++++++++ nutsdb/config.go | 107 ++++ nutsdb/configDb.go | 126 ++++ nutsdb/configFs.go | 196 ++++++ nutsdb/entryKv.go | 242 +++++++ nutsdb/errors.go | 150 +++++ nutsdb/interface.go | 65 ++ nutsdb/model.go | 203 ++++++ nutsdb/node.go | 354 +++++++++++ nutsdb/options.go | 210 +++++++ nutsdb/snap.go | 175 ++++++ 13 files changed, 3944 insertions(+) create mode 100644 nutsdb/Command.go create mode 100644 nutsdb/client.go create mode 100644 nutsdb/cmdCode.go create mode 100644 nutsdb/config.go create mode 100644 nutsdb/configDb.go create mode 100644 nutsdb/configFs.go create mode 100644 nutsdb/entryKv.go create mode 100644 nutsdb/errors.go create mode 100644 nutsdb/interface.go create mode 100644 nutsdb/model.go create mode 100644 nutsdb/node.go create mode 100644 nutsdb/options.go create mode 100644 nutsdb/snap.go diff --git a/nutsdb/Command.go b/nutsdb/Command.go new file mode 100644 index 0000000..8df1134 --- /dev/null +++ b/nutsdb/Command.go @@ -0,0 +1,232 @@ +/*********************************************************************************************************************** + * + * MIT License + * + * Copyright (c) 2021 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * + **********************************************************************************************************************/ + +package nutsdb + +import ( + "github.com/xujiajun/nutsdb" + "github.com/xujiajun/nutsdb/ds/zset" +) + +type Commands interface { + CommandTransaction + CommandBPTree + CommandSet + CommandList + CommandZSet +} + +type CommandTransaction interface { + // Put sets the value for a key in the bucket. + Put(bucket string, key, value []byte, ttl uint32) error + + // PutWithTimestamp sets the value for a key in the bucket but allow capabilities to custom the timestamp for ttl. + PutWithTimestamp(bucket string, key, value []byte, ttl uint32, timestamp uint64) error +} + +type CommandBPTree interface { + // Get retrieves the value for a key in the bucket. + // The returned value is only valid for the life of the transaction. + Get(bucket string, key []byte) (e *nutsdb.Entry, err error) + + //GetAll returns all keys and values of the bucket stored at given bucket. + GetAll(bucket string) (entries nutsdb.Entries, err error) + + // RangeScan query a range at given bucket, start and end slice. + RangeScan(bucket string, start, end []byte) (es nutsdb.Entries, err error) + + // PrefixScan iterates over a key prefix at given bucket, prefix and limitNum. + // LimitNum will limit the number of entries return. + PrefixScan(bucket string, prefix []byte, offsetNum int, limitNum int) (es nutsdb.Entries, off int, err error) + + // PrefixSearchScan iterates over a key prefix at given bucket, prefix, match regular expression and limitNum. + // LimitNum will limit the number of entries return. + PrefixSearchScan(bucket string, prefix []byte, reg string, offsetNum int, limitNum int) (es nutsdb.Entries, off int, err error) + + // Delete removes a key from the bucket at given bucket and key. + Delete(bucket string, key []byte) error + + // FindTxIDOnDisk returns if txId on disk at given fid and txID. + FindTxIDOnDisk(fID, txID uint64) (ok bool, err error) + + // FindOnDisk returns entry on disk at given fID, rootOff and key. + FindOnDisk(fID uint64, rootOff uint64, key, newKey []byte) (entry *nutsdb.Entry, err error) + + // FindLeafOnDisk returns binary leaf node on disk at given fId, rootOff and key. + FindLeafOnDisk(fID int64, rootOff int64, key, newKey []byte) (bn *nutsdb.BinaryNode, err error) +} + +type CommandSet interface { + // SAdd adds the specified members to the set stored int the bucket at given bucket,key and items. + SAdd(bucket string, key []byte, items ...[]byte) error + + // SRem removes the specified members from the set stored int the bucket at given bucket,key and items. + SRem(bucket string, key []byte, items ...[]byte) error + + // SAreMembers returns if the specified members are the member of the set int the bucket at given bucket,key and items. + SAreMembers(bucket string, key []byte, items ...[]byte) (bool, error) + + // SIsMember returns if member is a member of the set stored int the bucket at given bucket,key and item. + SIsMember(bucket string, key, item []byte) (bool, error) + + // SMembers returns all the members of the set value stored int the bucket at given bucket and key. + SMembers(bucket string, key []byte) (list [][]byte, err error) + + // SHasKey returns if the set in the bucket at given bucket and key. + SHasKey(bucket string, key []byte) (bool, error) + + // SPop removes and returns one or more random elements from the set value store in the bucket at given bucket and key. + SPop(bucket string, key []byte) ([]byte, error) + + // SCard returns the set cardinality (number of elements) of the set stored in the bucket at given bucket and key. + SCard(bucket string, key []byte) (int, error) + + // SDiffByOneBucket returns the members of the set resulting from the difference + // between the first set and all the successive sets in one bucket. + SDiffByOneBucket(bucket string, key1, key2 []byte) (list [][]byte, err error) + + // SDiffByTwoBuckets returns the members of the set resulting from the difference + // between the first set and all the successive sets in two buckets. + SDiffByTwoBuckets(bucket1 string, key1 []byte, bucket2 string, key2 []byte) (list [][]byte, err error) + + // SMoveByOneBucket moves member from the set at source to the set at destination in one bucket. + SMoveByOneBucket(bucket string, key1, key2, item []byte) (bool, error) + + // SMoveByTwoBuckets moves member from the set at source to the set at destination in two buckets. + SMoveByTwoBuckets(bucket1 string, key1 []byte, bucket2 string, key2, item []byte) (bool, error) + + // SUnionByOneBucket the members of the set resulting from the union of all the given sets in one bucket. + SUnionByOneBucket(bucket string, key1, key2 []byte) (list [][]byte, err error) + + // SUnionByTwoBuckets the members of the set resulting from the union of all the given sets in two buckets. + SUnionByTwoBuckets(bucket1 string, key1 []byte, bucket2 string, key2 []byte) (list [][]byte, err error) +} + +type CommandList interface { + // RPop removes and returns the last element of the list stored in the bucket at given bucket and key. + RPop(bucket string, key []byte) (item []byte, err error) + + // RPeek returns the last element of the list stored in the bucket at given bucket and key. + RPeek(bucket string, key []byte) (item []byte, err error) + + // RPush inserts the values at the tail of the list stored in the bucket at given bucket,key and values. + RPush(bucket string, key []byte, values ...[]byte) error + + // LPush inserts the values at the head of the list stored in the bucket at given bucket,key and values. + LPush(bucket string, key []byte, values ...[]byte) error + + // LPop removes and returns the first element of the list stored in the bucket at given bucket and key. + LPop(bucket string, key []byte) (item []byte, err error) + + // LPeek returns the first element of the list stored in the bucket at given bucket and key. + LPeek(bucket string, key []byte) (item []byte, err error) + + // LSize returns the size of key in the bucket in the bucket at given bucket and key. + LSize(bucket string, key []byte) (int, error) + + // LRange returns the specified elements of the list stored in the bucket at given bucket,key, start and end. + // The offsets start and stop are zero-based indexes 0 being the first element of the list (the head of the list), + // 1 being the next element and so on. + // Start and end can also be negative numbers indicating offsets from the end of the list, + // where -1 is the last element of the list, -2 the penultimate element and so on. + LRange(bucket string, key []byte, start, end int) (list [][]byte, err error) + + // LRem removes the first count occurrences of elements equal to value from the list stored in the bucket at given bucket,key,count. + // The count argument influences the operation in the following ways: + // count > 0: Remove elements equal to value moving from head to tail. + // count < 0: Remove elements equal to value moving from tail to head. + // count = 0: Remove all elements equal to value. + LRem(bucket string, key []byte, count int, value []byte) (removedNum int, err error) + + // LSet sets the list element at index to value. + LSet(bucket string, key []byte, index int, value []byte) error + + // LTrim trims an existing list so that it will contain only the specified range of elements specified. + // the offsets start and stop are zero-based indexes 0 being the first element of the list (the head of the list), + // 1 being the next element and so on. + // start and end can also be negative numbers indicating offsets from the end of the list, + // where -1 is the last element of the list, -2 the penultimate element and so on. + LTrim(bucket string, key []byte, start, end int) error +} + +type CommandZSet interface { + // ZAdd adds the specified member key with the specified score and specified val to the sorted set stored at bucket. + ZAdd(bucket string, key []byte, score float64, val []byte) error + + // ZMembers returns all the members of the set value stored at bucket. + ZMembers(bucket string) (map[string]*zset.SortedSetNode, error) + + // ZCard returns the sorted set cardinality (number of elements) of the sorted set stored at bucket. + ZCard(bucket string) (int, error) + + // ZCount returns the number of elements in the sorted set at bucket with a score between min and max and opts. + // opts includes the following parameters: + // Limit int // limit the max nodes to return + // ExcludeStart bool // exclude start value, so it search in interval (start, end] or (start, end) + // ExcludeEnd bool // exclude end value, so it search in interval [start, end) or (start, end) + ZCount(bucket string, start, end float64, opts *zset.GetByScoreRangeOptions) (int, error) + + // ZPopMax removes and returns the member with the highest score in the sorted set stored at bucket. + ZPopMax(bucket string) (*zset.SortedSetNode, error) + + // ZPopMin removes and returns the member with the lowest score in the sorted set stored at bucket. + ZPopMin(bucket string) (*zset.SortedSetNode, error) + + // ZPeekMax returns the member with the highest score in the sorted set stored at bucket. + ZPeekMax(bucket string) (*zset.SortedSetNode, error) + + // ZPeekMin returns the member with the lowest score in the sorted set stored at bucket. + ZPeekMin(bucket string) (*zset.SortedSetNode, error) + + // ZRangeByScore returns all the elements in the sorted set at bucket with a score between min and max. + ZRangeByScore(bucket string, start, end float64, opts *zset.GetByScoreRangeOptions) ([]*zset.SortedSetNode, error) + + // ZRangeByRank returns all the elements in the sorted set in one bucket and key + // with a rank between start and end (including elements with rank equal to start or end). + ZRangeByRank(bucket string, start, end int) ([]*zset.SortedSetNode, error) + + // ZRem removes the specified members from the sorted set stored in one bucket at given bucket and key. + ZRem(bucket, key string) error + + // ZRemRangeByRank removes all elements in the sorted set stored in one bucket at given bucket with rank between start and end. + // the rank is 1-based integer. Rank 1 means the first node; Rank -1 means the last node. + ZRemRangeByRank(bucket string, start, end int) error + + // ZRank returns the rank of member in the sorted set stored in the bucket at given bucket and key, + // with the scores ordered from low to high. + ZRank(bucket string, key []byte) (int, error) + + // ZRevRank returns the rank of member in the sorted set stored in the bucket at given bucket and key, + // with the scores ordered from high to low. + ZRevRank(bucket string, key []byte) (int, error) + + // ZScore returns the score of member in the sorted set in the bucket at given bucket and key. + ZScore(bucket string, key []byte) (float64, error) + + // ZGetByKey returns node in the bucket at given bucket and key. + ZGetByKey(bucket string, key []byte) (*zset.SortedSetNode, error) +} diff --git a/nutsdb/client.go b/nutsdb/client.go new file mode 100644 index 0000000..a100534 --- /dev/null +++ b/nutsdb/client.go @@ -0,0 +1,1464 @@ +/*********************************************************************************************************************** + * + * MIT License + * + * Copyright (c) 2021 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * + **********************************************************************************************************************/ + +package nutsdb + +import ( + "context" + "time" + + "github.com/lni/dragonboat/v3/statemachine" + libclu "github.com/nabbar/golib/cluster" + liberr "github.com/nabbar/golib/errors" + "github.com/xujiajun/nutsdb" + "github.com/xujiajun/nutsdb/ds/zset" +) + +type Client interface { + Commands +} + +type clientNutDB struct { + x context.Context + t time.Duration + c func() libclu.Cluster + w func(ctx context.Context, tick time.Duration) +} + +func (c *clientNutDB) call(cmd *CommandRequest, read bool) (*CommandResponse, liberr.Error) { + var ( + p []byte + e liberr.Error + i interface{} + d statemachine.Result + r *CommandResponse + + ok bool + ) + + if read { + c.w(c.x, c.t) + if i, e = c.c().SyncRead(c.x, cmd); e != nil { + return nil, e + } else if r, ok = i.(*CommandResponse); !ok { + return nil, ErrorClientCommandResponseInvalid.Error(nil) + } else { + return r, nil + } + } else if p, e = cmd.EncodeRequest(); e != nil { + return nil, e + } else { + c.w(c.x, c.t) + if d, e = c.c().SyncPropose(c.x, c.c().GetNoOPSession(), p); e != nil { + return nil, e + } else if r, e = cmd.DecodeResult(d.Data); e != nil { + return nil, e + } else { + return r, nil + } + } +} + +// nolint #dupl +func (c *clientNutDB) Put(bucket string, key, value []byte, ttl uint32) error { + var ( + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, key, value, ttl) + + if r, f = c.call(d, false); f != nil { + return f + } else if r == nil { + return nil + } else if r.Error != nil { + return r.Error + } + + return nil +} + +// nolint #dupl +func (c *clientNutDB) PutWithTimestamp(bucket string, key, value []byte, ttl uint32, timestamp uint64) error { + var ( + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, key, value, ttl, timestamp) + + if r, f = c.call(d, false); f != nil { + return f + } else if r == nil { + return nil + } else if r.Error != nil { + return r.Error + } + + return nil +} + +// nolint #dupl +func (c *clientNutDB) Get(bucket string, key []byte) (e *nutsdb.Entry, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, key) + + if r, f = c.call(d, true); f != nil { + return nil, f + } else if r == nil { + return nil, nil + } else if r.Error != nil { + return nil, r.Error + } else if len(r.Value) < 1 { + return nil, nil + } + + if e, k = r.Value[0].(*nutsdb.Entry); !k { + e = nil + } + + return +} + +// nolint #dupl +func (c *clientNutDB) GetAll(bucket string) (entries nutsdb.Entries, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket) + + if r, f = c.call(d, true); f != nil { + return nil, f + } else if r == nil { + return nil, nil + } else if r.Error != nil { + return nil, r.Error + } else if len(r.Value) < 1 { + return nil, nil + } + + if entries, k = r.Value[0].(nutsdb.Entries); !k { + entries = nil + } + + return +} + +// nolint #dupl +func (c *clientNutDB) RangeScan(bucket string, start, end []byte) (es nutsdb.Entries, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, start, end) + + if r, f = c.call(d, true); f != nil { + return nil, f + } else if r == nil { + return nil, nil + } else if r.Error != nil { + return nil, r.Error + } else if len(r.Value) < 1 { + return nil, nil + } + + if es, k = r.Value[0].(nutsdb.Entries); !k { + es = nil + } + + return +} + +// nolint #dupl +func (c *clientNutDB) PrefixScan(bucket string, prefix []byte, offsetNum int, limitNum int) (es nutsdb.Entries, off int, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, prefix, offsetNum, limitNum) + + if r, f = c.call(d, true); f != nil { + return nil, 0, f + } else if r == nil { + return nil, 0, nil + } else if r.Error != nil { + return nil, 0, r.Error + } else if len(r.Value) < 2 { + return nil, 0, nil + } + + if es, k = r.Value[0].(nutsdb.Entries); !k { + es = nil + } + + if off, k = r.Value[1].(int); !k { + off = 0 + } + + return +} + +// nolint #dupl +func (c *clientNutDB) PrefixSearchScan(bucket string, prefix []byte, reg string, offsetNum int, limitNum int) (es nutsdb.Entries, off int, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, prefix, reg, offsetNum, limitNum) + + if r, f = c.call(d, true); f != nil { + return nil, 0, f + } else if r == nil { + return nil, 0, nil + } else if r.Error != nil { + return nil, 0, r.Error + } else if len(r.Value) < 2 { + return nil, 0, nil + } + + if es, k = r.Value[0].(nutsdb.Entries); !k { + es = nil + } + + if off, k = r.Value[1].(int); !k { + off = 0 + } + + return +} + +// nolint #dupl +func (c *clientNutDB) Delete(bucket string, key []byte) error { + var ( + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, key) + + if r, f = c.call(d, true); f != nil { + return f + } else if r == nil { + return nil + } else if r.Error != nil { + return r.Error + } + + return nil +} + +// nolint #dupl +func (c *clientNutDB) FindTxIDOnDisk(fID, txID uint64) (ok bool, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(fID, txID) + + if r, f = c.call(d, true); f != nil { + return false, f + } else if r == nil { + return false, nil + } else if r.Error != nil { + return false, r.Error + } else if len(r.Value) < 1 { + return false, nil + } + + if ok, k = r.Value[0].(bool); !k { + ok = false + } + + return +} + +// nolint #dupl +func (c *clientNutDB) FindOnDisk(fID uint64, rootOff uint64, key, newKey []byte) (entry *nutsdb.Entry, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(fID, rootOff, key, newKey) + + if r, f = c.call(d, true); f != nil { + return nil, f + } else if r == nil { + return nil, nil + } else if r.Error != nil { + return nil, r.Error + } else if len(r.Value) < 1 { + return nil, nil + } + + if entry, k = r.Value[0].(*nutsdb.Entry); !k { + entry = nil + } + + return +} + +// nolint #dupl +func (c *clientNutDB) FindLeafOnDisk(fID int64, rootOff int64, key, newKey []byte) (bn *nutsdb.BinaryNode, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(fID, rootOff, key, newKey) + + if r, f = c.call(d, true); f != nil { + return nil, f + } else if r == nil { + return nil, nil + } else if r.Error != nil { + return nil, r.Error + } else if len(r.Value) < 1 { + return nil, nil + } + + if bn, k = r.Value[0].(*nutsdb.BinaryNode); !k { + bn = nil + } + + return +} + +// nolint #dupl +func (c *clientNutDB) SAdd(bucket string, key []byte, items ...[]byte) error { + var ( + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, key, items) + + if r, f = c.call(d, true); f != nil { + return f + } else if r == nil { + return nil + } else if r.Error != nil { + return r.Error + } + + return nil +} + +// nolint #dupl +func (c *clientNutDB) SRem(bucket string, key []byte, items ...[]byte) error { + var ( + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, key, items) + + if r, f = c.call(d, true); f != nil { + return f + } else if r == nil { + return nil + } else if r.Error != nil { + return r.Error + } + + return nil +} + +// nolint #dupl +func (c *clientNutDB) SAreMembers(bucket string, key []byte, items ...[]byte) (ok bool, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, key, items) + + if r, f = c.call(d, true); f != nil { + return false, f + } else if r == nil { + return false, nil + } else if r.Error != nil { + return false, r.Error + } else if len(r.Value) < 1 { + return false, nil + } + + if ok, k = r.Value[0].(bool); !k { + ok = false + } + + return +} + +// nolint #dupl +func (c *clientNutDB) SIsMember(bucket string, key, item []byte) (ok bool, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, key, item) + + if r, f = c.call(d, true); f != nil { + return false, f + } else if r == nil { + return false, nil + } else if r.Error != nil { + return false, r.Error + } else if len(r.Value) < 1 { + return false, nil + } + + if ok, k = r.Value[0].(bool); !k { + ok = false + } + + return +} + +// nolint #dupl +func (c *clientNutDB) SMembers(bucket string, key []byte) (list [][]byte, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, key) + + if r, f = c.call(d, true); f != nil { + return nil, f + } else if r == nil { + return nil, nil + } else if r.Error != nil { + return nil, r.Error + } else if len(r.Value) < 1 { + return nil, nil + } + + if list, k = r.Value[0].([][]byte); !k { + list = nil + } + + return +} + +// nolint #dupl +func (c *clientNutDB) SHasKey(bucket string, key []byte) (ok bool, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, key) + + if r, f = c.call(d, true); f != nil { + return false, f + } else if r == nil { + return false, nil + } else if r.Error != nil { + return false, r.Error + } else if len(r.Value) < 1 { + return false, nil + } + + if ok, k = r.Value[0].(bool); !k { + ok = false + } + + return +} + +// nolint #dupl +func (c *clientNutDB) SPop(bucket string, key []byte) (val []byte, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, key) + + if r, f = c.call(d, true); f != nil { + return nil, f + } else if r == nil { + return nil, nil + } else if r.Error != nil { + return nil, r.Error + } else if len(r.Value) < 1 { + return nil, nil + } + + if val, k = r.Value[0].([]byte); !k { + val = nil + } + + return +} + +// nolint #dupl +func (c *clientNutDB) SCard(bucket string, key []byte) (card int, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, key) + + if r, f = c.call(d, true); f != nil { + return 0, f + } else if r == nil { + return 0, nil + } else if r.Error != nil { + return 0, r.Error + } else if len(r.Value) < 1 { + return 0, nil + } + + if card, k = r.Value[0].(int); !k { + card = 0 + } + + return +} + +// nolint #dupl +func (c *clientNutDB) SDiffByOneBucket(bucket string, key1, key2 []byte) (list [][]byte, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, key1, key2) + + if r, f = c.call(d, true); f != nil { + return nil, f + } else if r == nil { + return nil, nil + } else if r.Error != nil { + return nil, r.Error + } else if len(r.Value) < 1 { + return nil, nil + } + + if list, k = r.Value[0].([][]byte); !k { + list = nil + } + + return +} + +// nolint #dupl +func (c *clientNutDB) SDiffByTwoBuckets(bucket1 string, key1 []byte, bucket2 string, key2 []byte) (list [][]byte, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket1, key1, bucket2, key2) + + if r, f = c.call(d, true); f != nil { + return nil, f + } else if r == nil { + return nil, nil + } else if r.Error != nil { + return nil, r.Error + } else if len(r.Value) < 1 { + return nil, nil + } + + if list, k = r.Value[0].([][]byte); !k { + list = nil + } + + return +} + +// nolint #dupl +func (c *clientNutDB) SMoveByOneBucket(bucket string, key1, key2, item []byte) (ok bool, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, key1, key2, item) + + if r, f = c.call(d, true); f != nil { + return false, f + } else if r == nil { + return false, nil + } else if r.Error != nil { + return false, r.Error + } else if len(r.Value) < 1 { + return false, nil + } + + if ok, k = r.Value[0].(bool); !k { + ok = false + } + + return +} + +// nolint #dupl +func (c *clientNutDB) SMoveByTwoBuckets(bucket1 string, key1 []byte, bucket2 string, key2, item []byte) (ok bool, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket1, key1, bucket2, key2, item) + + if r, f = c.call(d, true); f != nil { + return false, f + } else if r == nil { + return false, nil + } else if r.Error != nil { + return false, r.Error + } else if len(r.Value) < 1 { + return false, nil + } + + if ok, k = r.Value[0].(bool); !k { + ok = false + } + + return +} + +// nolint #dupl +func (c *clientNutDB) SUnionByOneBucket(bucket string, key1, key2 []byte) (list [][]byte, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, key1, key2) + + if r, f = c.call(d, true); f != nil { + return nil, f + } else if r == nil { + return nil, nil + } else if r.Error != nil { + return nil, r.Error + } else if len(r.Value) < 1 { + return nil, nil + } + + if list, k = r.Value[0].([][]byte); !k { + list = nil + } + + return +} + +// nolint #dupl +func (c *clientNutDB) SUnionByTwoBuckets(bucket1 string, key1 []byte, bucket2 string, key2 []byte) (list [][]byte, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket1, key1, bucket2, key2) + + if r, f = c.call(d, true); f != nil { + return nil, f + } else if r == nil { + return nil, nil + } else if r.Error != nil { + return nil, r.Error + } else if len(r.Value) < 1 { + return nil, nil + } + + if list, k = r.Value[0].([][]byte); !k { + list = nil + } + + return +} + +// nolint #dupl +func (c *clientNutDB) RPop(bucket string, key []byte) (item []byte, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, key) + + if r, f = c.call(d, true); f != nil { + return nil, f + } else if r == nil { + return nil, nil + } else if r.Error != nil { + return nil, r.Error + } else if len(r.Value) < 1 { + return nil, nil + } + + if item, k = r.Value[0].([]byte); !k { + item = nil + } + + return +} + +// nolint #dupl +func (c *clientNutDB) RPeek(bucket string, key []byte) (item []byte, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, key) + + if r, f = c.call(d, true); f != nil { + return nil, f + } else if r == nil { + return nil, nil + } else if r.Error != nil { + return nil, r.Error + } else if len(r.Value) < 1 { + return nil, nil + } + + if item, k = r.Value[0].([]byte); !k { + item = nil + } + + return +} + +// nolint #dupl +func (c *clientNutDB) RPush(bucket string, key []byte, values ...[]byte) error { + var ( + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, key, values) + + if r, f = c.call(d, true); f != nil { + return f + } else if r == nil { + return nil + } else if r.Error != nil { + return r.Error + } + + return nil +} + +// nolint #dupl +func (c *clientNutDB) LPush(bucket string, key []byte, values ...[]byte) error { + var ( + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, key, values) + + if r, f = c.call(d, true); f != nil { + return f + } else if r == nil { + return nil + } else if r.Error != nil { + return r.Error + } + + return nil +} + +// nolint #dupl +func (c *clientNutDB) LPop(bucket string, key []byte) (item []byte, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, key) + + if r, f = c.call(d, true); f != nil { + return nil, f + } else if r == nil { + return nil, nil + } else if r.Error != nil { + return nil, r.Error + } else if len(r.Value) < 1 { + return nil, nil + } + + if item, k = r.Value[0].([]byte); !k { + item = nil + } + + return +} + +// nolint #dupl +func (c *clientNutDB) LPeek(bucket string, key []byte) (item []byte, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, key) + + if r, f = c.call(d, true); f != nil { + return nil, f + } else if r == nil { + return nil, nil + } else if r.Error != nil { + return nil, r.Error + } else if len(r.Value) < 1 { + return nil, nil + } + + if item, k = r.Value[0].([]byte); !k { + item = nil + } + + return +} + +// nolint #dupl +func (c *clientNutDB) LSize(bucket string, key []byte) (size int, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, key) + + if r, f = c.call(d, true); f != nil { + return 0, f + } else if r == nil { + return 0, nil + } else if r.Error != nil { + return 0, r.Error + } else if len(r.Value) < 1 { + return 0, nil + } + + if size, k = r.Value[0].(int); !k { + size = 0 + } + + return +} + +// nolint #dupl +func (c *clientNutDB) LRange(bucket string, key []byte, start, end int) (list [][]byte, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, key, start, end) + + if r, f = c.call(d, true); f != nil { + return nil, f + } else if r == nil { + return nil, nil + } else if r.Error != nil { + return nil, r.Error + } else if len(r.Value) < 1 { + return nil, nil + } + + if list, k = r.Value[0].([][]byte); !k { + list = nil + } + + return +} + +// nolint #dupl +func (c *clientNutDB) LRem(bucket string, key []byte, count int, value []byte) (removedNum int, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, key, count, value) + + if r, f = c.call(d, true); f != nil { + return 0, f + } else if r == nil { + return 0, nil + } else if r.Error != nil { + return 0, r.Error + } else if len(r.Value) < 1 { + return 0, nil + } + + if removedNum, k = r.Value[0].(int); !k { + removedNum = 0 + } + + return +} + +// nolint #dupl +func (c *clientNutDB) LSet(bucket string, key []byte, index int, value []byte) error { + var ( + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, key, index, value) + + if r, f = c.call(d, true); f != nil { + return f + } else if r == nil { + return nil + } else if r.Error != nil { + return r.Error + } + + return nil +} + +// nolint #dupl +func (c *clientNutDB) LTrim(bucket string, key []byte, start, end int) error { + var ( + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, key, start, end) + + if r, f = c.call(d, true); f != nil { + return f + } else if r == nil { + return nil + } else if r.Error != nil { + return r.Error + } + + return nil +} + +// nolint #dupl +func (c *clientNutDB) ZAdd(bucket string, key []byte, score float64, val []byte) error { + var ( + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, key, score, val) + + if r, f = c.call(d, true); f != nil { + return f + } else if r == nil { + return nil + } else if r.Error != nil { + return r.Error + } + + return nil +} + +// nolint #dupl +func (c *clientNutDB) ZMembers(bucket string) (list map[string]*zset.SortedSetNode, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket) + + if r, f = c.call(d, true); f != nil { + return nil, f + } else if r == nil { + return nil, nil + } else if r.Error != nil { + return nil, r.Error + } else if len(r.Value) < 1 { + return nil, nil + } + + if list, k = r.Value[0].(map[string]*zset.SortedSetNode); !k { + list = nil + } + + return +} + +// nolint #dupl +func (c *clientNutDB) ZCard(bucket string) (card int, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket) + + if r, f = c.call(d, true); f != nil { + return 0, f + } else if r == nil { + return 0, nil + } else if r.Error != nil { + return 0, r.Error + } else if len(r.Value) < 1 { + return 0, nil + } + + if card, k = r.Value[0].(int); !k { + card = 0 + } + + return +} + +// nolint #dupl +func (c *clientNutDB) ZCount(bucket string, start, end float64, opts *zset.GetByScoreRangeOptions) (number int, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, start, end, opts) + + if r, f = c.call(d, true); f != nil { + return 0, f + } else if r == nil { + return 0, nil + } else if r.Error != nil { + return 0, r.Error + } else if len(r.Value) < 1 { + return 0, nil + } + + if number, k = r.Value[0].(int); !k { + number = 0 + } + + return +} + +// nolint #dupl +func (c *clientNutDB) ZPopMax(bucket string) (item *zset.SortedSetNode, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket) + + if r, f = c.call(d, true); f != nil { + return nil, f + } else if r == nil { + return nil, nil + } else if r.Error != nil { + return nil, r.Error + } else if len(r.Value) < 1 { + return nil, nil + } + + if item, k = r.Value[0].(*zset.SortedSetNode); !k { + item = nil + } + + return +} + +// nolint #dupl +func (c *clientNutDB) ZPopMin(bucket string) (item *zset.SortedSetNode, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket) + + if r, f = c.call(d, true); f != nil { + return nil, f + } else if r == nil { + return nil, nil + } else if r.Error != nil { + return nil, r.Error + } else if len(r.Value) < 1 { + return nil, nil + } + + if item, k = r.Value[0].(*zset.SortedSetNode); !k { + item = nil + } + + return +} + +// nolint #dupl +func (c *clientNutDB) ZPeekMax(bucket string) (item *zset.SortedSetNode, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket) + + if r, f = c.call(d, true); f != nil { + return nil, f + } else if r == nil { + return nil, nil + } else if r.Error != nil { + return nil, r.Error + } else if len(r.Value) < 1 { + return nil, nil + } + + if item, k = r.Value[0].(*zset.SortedSetNode); !k { + item = nil + } + + return +} + +// nolint #dupl +func (c *clientNutDB) ZPeekMin(bucket string) (item *zset.SortedSetNode, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket) + + if r, f = c.call(d, true); f != nil { + return nil, f + } else if r == nil { + return nil, nil + } else if r.Error != nil { + return nil, r.Error + } else if len(r.Value) < 1 { + return nil, nil + } + + if item, k = r.Value[0].(*zset.SortedSetNode); !k { + item = nil + } + + return +} + +// nolint #dupl +func (c *clientNutDB) ZRangeByScore(bucket string, start, end float64, opts *zset.GetByScoreRangeOptions) (list []*zset.SortedSetNode, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, start, end, opts) + + if r, f = c.call(d, true); f != nil { + return nil, f + } else if r == nil { + return nil, nil + } else if r.Error != nil { + return nil, r.Error + } else if len(r.Value) < 1 { + return nil, nil + } + + if list, k = r.Value[0].([]*zset.SortedSetNode); !k { + list = nil + } + + return +} + +// nolint #dupl +func (c *clientNutDB) ZRangeByRank(bucket string, start, end int) (list []*zset.SortedSetNode, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, start, end) + + if r, f = c.call(d, true); f != nil { + return nil, f + } else if r == nil { + return nil, nil + } else if r.Error != nil { + return nil, r.Error + } else if len(r.Value) < 1 { + return nil, nil + } + + if list, k = r.Value[0].([]*zset.SortedSetNode); !k { + list = nil + } + + return +} + +// nolint #dupl +func (c *clientNutDB) ZRem(bucket, key string) error { + var ( + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, key) + + if r, f = c.call(d, true); f != nil { + return f + } else if r == nil { + return nil + } else if r.Error != nil { + return r.Error + } + + return nil +} + +// nolint #dupl +func (c *clientNutDB) ZRemRangeByRank(bucket string, start, end int) error { + var ( + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, start, end) + + if r, f = c.call(d, true); f != nil { + return f + } else if r == nil { + return nil + } else if r.Error != nil { + return r.Error + } + + return nil +} + +// nolint #dupl +func (c *clientNutDB) ZRank(bucket string, key []byte) (rank int, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, key) + + if r, f = c.call(d, true); f != nil { + return 0, f + } else if r == nil { + return 0, nil + } else if r.Error != nil { + return 0, r.Error + } else if len(r.Value) < 1 { + return 0, nil + } + + if rank, k = r.Value[0].(int); !k { + rank = 0 + } + + return +} + +// nolint #dupl +func (c *clientNutDB) ZRevRank(bucket string, key []byte) (rank int, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, key) + + if r, f = c.call(d, true); f != nil { + return 0, f + } else if r == nil { + return 0, nil + } else if r.Error != nil { + return 0, r.Error + } else if len(r.Value) < 1 { + return 0, nil + } + + if rank, k = r.Value[0].(int); !k { + rank = 0 + } + + return +} + +// nolint #dupl +func (c *clientNutDB) ZScore(bucket string, key []byte) (score float64, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, key) + + if r, f = c.call(d, true); f != nil { + return 0, f + } else if r == nil { + return 0, nil + } else if r.Error != nil { + return 0, r.Error + } else if len(r.Value) < 1 { + return 0, nil + } + + if score, k = r.Value[0].(float64); !k { + score = 0 + } + + return +} + +// nolint #dupl +func (c *clientNutDB) ZGetByKey(bucket string, key []byte) (item *zset.SortedSetNode, err error) { + var ( + k bool + f liberr.Error + r *CommandResponse + d *CommandRequest + ) + + d = NewCommandByCaller(bucket, key) + + if r, f = c.call(d, true); f != nil { + return nil, f + } else if r == nil { + return nil, nil + } else if r.Error != nil { + return nil, r.Error + } else if len(r.Value) < 1 { + return nil, nil + } + + if item, k = r.Value[0].(*zset.SortedSetNode); !k { + item = nil + } + + return +} diff --git a/nutsdb/cmdCode.go b/nutsdb/cmdCode.go new file mode 100644 index 0000000..0482b20 --- /dev/null +++ b/nutsdb/cmdCode.go @@ -0,0 +1,420 @@ +/*********************************************************************************************************************** + * + * MIT License + * + * Copyright (c) 2021 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * + **********************************************************************************************************************/ + +package nutsdb + +type CmdCode uint32 + +const ( + // CmdUnknown is no Command. + CmdUnknown CmdCode = iota + // Command for transaction. + CmdPut + CmdPutWithTimestamp + // Command for BPTree. + CmdGet + CmdGetAll + CmdRangeScan + CmdPrefixScan + CmdPrefixSearchScan + CmdDelete + CmdFindTxIDOnDisk + CmdFindOnDisk + CmdFindLeafOnDisk + // Command for Set. + CmdSAdd + CmdSRem + CmdSAreMembers + CmdSIsMember + CmdSMembers + CmdSHasKey + CmdSPop + CmdSCard + CmdSDiffByOneBucket + CmdSDiffByTwoBuckets + CmdSMoveByOneBucket + CmdSMoveByTwoBuckets + CmdSUnionByOneBucket + CmdSUnionByTwoBuckets + // Command for List. + CmdRPop + CmdRPeek + CmdRPush + CmdLPush + CmdLPop + CmdLPeek + CmdLSize + CmdLRange + CmdLRem + CmdLSet + CmdLTrim + // Command for ZSet. + CmdZAdd + CmdZMembers + CmdZCard + CmdZCount + CmdZPopMax + CmdZPopMin + CmdZPeekMax + CmdZPeekMin + CmdZRangeByScore + CmdZRangeByRank + CmdZRem + CmdZRemRangeByRank + CmdZRank + CmdZRevRank + CmdZScore + CmdZGetByKey +) + +// nolint #funlen +func CmdCodeFromName(name string) CmdCode { + switch name { + case CmdPut.Name(): + return CmdPut + + case CmdPutWithTimestamp.Name(): + return CmdPutWithTimestamp + + case CmdGet.Name(): + return CmdGet + + case CmdGetAll.Name(): + return CmdGetAll + + case CmdRangeScan.Name(): + return CmdRangeScan + + case CmdPrefixScan.Name(): + return CmdPrefixScan + + case CmdPrefixSearchScan.Name(): + return CmdPrefixSearchScan + + case CmdDelete.Name(): + return CmdDelete + + case CmdFindTxIDOnDisk.Name(): + return CmdFindTxIDOnDisk + + case CmdFindOnDisk.Name(): + return CmdFindOnDisk + + case CmdFindLeafOnDisk.Name(): + return CmdFindLeafOnDisk + + case CmdSAdd.Name(): + return CmdSAdd + + case CmdSRem.Name(): + return CmdSRem + + case CmdSAreMembers.Name(): + return CmdSAreMembers + + case CmdSIsMember.Name(): + return CmdSIsMember + + case CmdSMembers.Name(): + return CmdSMembers + + case CmdSHasKey.Name(): + return CmdSHasKey + + case CmdSPop.Name(): + return CmdSPop + + case CmdSCard.Name(): + return CmdSCard + + case CmdSDiffByOneBucket.Name(): + return CmdSDiffByOneBucket + + case CmdSDiffByTwoBuckets.Name(): + return CmdSDiffByTwoBuckets + + case CmdSMoveByOneBucket.Name(): + return CmdSMoveByOneBucket + + case CmdSMoveByTwoBuckets.Name(): + return CmdSMoveByTwoBuckets + + case CmdSUnionByOneBucket.Name(): + return CmdSUnionByOneBucket + + case CmdSUnionByTwoBuckets.Name(): + return CmdSUnionByTwoBuckets + + case CmdRPop.Name(): + return CmdRPop + + case CmdRPeek.Name(): + return CmdRPeek + + case CmdRPush.Name(): + return CmdRPush + + case CmdLPush.Name(): + return CmdLPush + + case CmdLPop.Name(): + return CmdLPop + + case CmdLPeek.Name(): + return CmdLPeek + + case CmdLSize.Name(): + return CmdLSize + + case CmdLRange.Name(): + return CmdLRange + + case CmdLRem.Name(): + return CmdLRem + + case CmdLSet.Name(): + return CmdLSet + + case CmdLTrim.Name(): + return CmdLTrim + + case CmdZAdd.Name(): + return CmdZAdd + + case CmdZMembers.Name(): + return CmdZMembers + + case CmdZCard.Name(): + return CmdZCard + + case CmdZCount.Name(): + return CmdZCount + + case CmdZPopMax.Name(): + return CmdZPopMax + + case CmdZPopMin.Name(): + return CmdZPopMin + + case CmdZPeekMax.Name(): + return CmdZPeekMax + + case CmdZPeekMin.Name(): + return CmdZPeekMin + + case CmdZRangeByScore.Name(): + return CmdZRangeByScore + + case CmdZRangeByRank.Name(): + return CmdZRangeByRank + + case CmdZRem.Name(): + return CmdZRem + + case CmdZRemRangeByRank.Name(): + return CmdZRemRangeByRank + + case CmdZRank.Name(): + return CmdZRank + + case CmdZRevRank.Name(): + return CmdZRevRank + + case CmdZScore.Name(): + return CmdZScore + + case CmdZGetByKey.Name(): + return CmdZGetByKey + + default: + return CmdUnknown + } +} + +// nolint #funlen +func (c CmdCode) Name() string { + switch c { + case CmdPut: + return "Put" + + case CmdPutWithTimestamp: + return "PutWithTimestamp" + + case CmdGet: + return "Get" + + case CmdGetAll: + return "GetAll" + + case CmdRangeScan: + return "RangeScan" + + case CmdPrefixScan: + return "PrefixScan" + + case CmdPrefixSearchScan: + return "PrefixSearchScan" + + case CmdDelete: + return "Delete" + + case CmdFindTxIDOnDisk: + return "FindTxIDOnDisk" + + case CmdFindOnDisk: + return "FindOnDisk" + + case CmdFindLeafOnDisk: + return "FindLeafOnDisk" + + case CmdSAdd: + return "SAdd" + + case CmdSRem: + return "SRem" + + case CmdSAreMembers: + return "SAreMembers" + + case CmdSIsMember: + return "SIsMember" + + case CmdSMembers: + return "SMembers" + + case CmdSHasKey: + return "SHasKey" + + case CmdSPop: + return "SPop" + + case CmdSCard: + return "SCard" + + case CmdSDiffByOneBucket: + return "SDiffByOneBucket" + + case CmdSDiffByTwoBuckets: + return "SDiffByTwoBuckets" + + case CmdSMoveByOneBucket: + return "SMoveByOneBucket" + + case CmdSMoveByTwoBuckets: + return "SMoveByTwoBuckets" + + case CmdSUnionByOneBucket: + return "SUnionByOneBucket" + + case CmdSUnionByTwoBuckets: + return "SUnionByTwoBuckets" + + case CmdRPop: + return "RPop" + + case CmdRPeek: + return "RPeek" + + case CmdRPush: + return "RPush" + + case CmdLPush: + return "LPush" + + case CmdLPop: + return "LPop" + + case CmdLPeek: + return "LPeek" + + case CmdLSize: + return "LSize" + + case CmdLRange: + return "LRange" + + case CmdLRem: + return "LRem" + + case CmdLSet: + return "LSet" + + case CmdLTrim: + return "LTrim" + + case CmdZAdd: + return "ZAdd" + + case CmdZMembers: + return "ZMembers" + + case CmdZCard: + return "ZCard" + + case CmdZCount: + return "ZCount" + + case CmdZPopMax: + return "ZPopMax" + + case CmdZPopMin: + return "ZPopMin" + + case CmdZPeekMax: + return "ZPeekMax" + + case CmdZPeekMin: + return "ZPeekMin" + + case CmdZRangeByScore: + return "ZRangeByScore" + + case CmdZRangeByRank: + return "ZRangeByRank" + + case CmdZRem: + return "ZRem" + + case CmdZRemRangeByRank: + return "ZRemRangeByRank" + + case CmdZRank: + return "ZRank" + + case CmdZRevRank: + return "ZRevRank" + + case CmdZScore: + return "ZScore" + + case CmdZGetByKey: + return "ZGetByKey" + + default: + return "" + } +} diff --git a/nutsdb/config.go b/nutsdb/config.go new file mode 100644 index 0000000..3070a25 --- /dev/null +++ b/nutsdb/config.go @@ -0,0 +1,107 @@ +/*********************************************************************************************************************** + * + * MIT License + * + * Copyright (c) 2021 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * + **********************************************************************************************************************/ + +package nutsdb + +import ( + "fmt" + + "github.com/go-playground/validator/v10" + "github.com/nabbar/golib/cluster" + liberr "github.com/nabbar/golib/errors" + "github.com/xujiajun/nutsdb" +) + +type Config struct { + DB NutsDBOptions `mapstructure:"db" json:"db" yaml:"db" toml:"db"` + Cluster cluster.Config `mapstructure:"cluster" json:"cluster" yaml:"cluster" toml:"cluster"` + Directory NutsDBFolder `mapstructure:"directories" json:"directories" yaml:"directories" toml:"directories" ` +} + +func (c Config) GetConfigFolder() NutsDBFolder { + return c.Directory +} + +func (c Config) GetConfigDB() (nutsdb.Options, liberr.Error) { + if dir, err := c.Directory.GetDirectoryData(); err != nil { + return nutsdb.Options{}, err + } else { + return c.DB.GetNutsDBOptions(dir), nil + } +} + +func (c Config) GetConfigCluster() (cluster.Config, liberr.Error) { + cfg := c.Cluster + + if dir, err := c.Directory.GetDirectoryWal(); err != nil { + return cfg, err + } else { + cfg.Node.WALDir = dir + } + + if dir, err := c.Directory.GetDirectoryHost(); err != nil { + return cfg, err + } else { + cfg.Node.NodeHostDir = dir + } + + return cfg, nil +} + +func (c Config) GetOptions() (Options, liberr.Error) { + return NewOptions(c.DB, c.Directory) +} + +func (c Config) Validate() liberr.Error { + val := validator.New() + err := val.Struct(c) + + if e, ok := err.(*validator.InvalidValidationError); ok { + return ErrorValidateConfig.ErrorParent(e) + } + + out := ErrorValidateConfig.Error(nil) + + for _, e := range err.(validator.ValidationErrors) { + //nolint goerr113 + out.AddParent(fmt.Errorf("config field '%s' is not validated by constraint '%s'", e.Field(), e.ActualTag())) + } + + if out.HasParent() { + return out + } + + return nil +} + +func (c Config) ValidateDB() liberr.Error { + return c.DB.Validate() +} + +func (c Config) ValidateCluster() liberr.Error { + return c.Cluster.Validate() +} diff --git a/nutsdb/configDb.go b/nutsdb/configDb.go new file mode 100644 index 0000000..cc44f4d --- /dev/null +++ b/nutsdb/configDb.go @@ -0,0 +1,126 @@ +/*********************************************************************************************************************** + * + * MIT License + * + * Copyright (c) 2021 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * + **********************************************************************************************************************/ + +package nutsdb + +import ( + "fmt" + + "github.com/go-playground/validator/v10" + liberr "github.com/nabbar/golib/errors" + "github.com/xujiajun/nutsdb" +) + +type NutsDBOptions struct { + // EntryIdxMode represents using which mode to index the entries. + EntryIdxMode nutsdb.EntryIdxMode `mapstructure:"entry_idx_mode" json:"entry_idx_mode" yaml:"entry_idx_mode" toml:"entry_idx_mode"` + + // RWMode represents the read and write mode. + // RWMode includes two options: FileIO and MMap. + // FileIO represents the read and write mode using standard I/O. + // MMap represents the read and write mode using mmap. + RWMode nutsdb.RWMode `mapstructure:"rw_mode" json:"rw_mode" yaml:"rw_mode" toml:"rw_mode"` + + // SegmentSize default value is 8 MBytes + SegmentSize int64 `mapstructure:"segment_size" json:"segment_size" yaml:"segment_size" toml:"segment_size"` + + // SyncEnable represents if call Sync() function. + // if SyncEnable is false, high write performance but potential data loss likely. + // if SyncEnable is true, slower but persistent. + SyncEnable bool `mapstructure:"sync_enable" json:"sync_enable" yaml:"sync_enable" toml:"sync_enable"` + + // StartFileLoadingMode represents when open a database which RWMode to load files. + StartFileLoadingMode nutsdb.RWMode `mapstructure:"start_file_loading_mode" json:"start_file_loading_mode" yaml:"start_file_loading_mode" toml:"start_file_loading_mode"` +} + +func (o NutsDBOptions) GetNutsDBOptions(dataDir string) nutsdb.Options { + d := nutsdb.DefaultOptions + + if len(dataDir) < 1 { + d.RWMode = nutsdb.MMap + d.StartFileLoadingMode = nutsdb.MMap + } else { + d.Dir = dataDir + + switch o.RWMode { + case nutsdb.MMap: + d.RWMode = nutsdb.MMap + default: + d.RWMode = nutsdb.FileIO + } + + switch o.StartFileLoadingMode { + case nutsdb.MMap: + d.StartFileLoadingMode = nutsdb.MMap + default: + d.StartFileLoadingMode = nutsdb.FileIO + } + } + + switch o.EntryIdxMode { + case nutsdb.HintKeyAndRAMIdxMode: + d.EntryIdxMode = nutsdb.HintKeyAndRAMIdxMode + case nutsdb.HintBPTSparseIdxMode: + d.EntryIdxMode = nutsdb.HintBPTSparseIdxMode + default: + d.EntryIdxMode = nutsdb.HintKeyValAndRAMIdxMode + } + + if o.SegmentSize > 0 { + d.SegmentSize = o.SegmentSize + } + + if o.SyncEnable { + d.SyncEnable = true + } else { + d.SyncEnable = false + } + + return d +} + +func (o NutsDBOptions) Validate() liberr.Error { + val := validator.New() + err := val.Struct(o) + + if e, ok := err.(*validator.InvalidValidationError); ok { + return ErrorValidateConfig.ErrorParent(e) + } + + out := ErrorValidateConfig.Error(nil) + + for _, e := range err.(validator.ValidationErrors) { + //nolint goerr113 + out.AddParent(fmt.Errorf("config field '%s' is not validated by constraint '%s'", e.Field(), e.ActualTag())) + } + + if out.HasParent() { + return out + } + + return nil +} diff --git a/nutsdb/configFs.go b/nutsdb/configFs.go new file mode 100644 index 0000000..9f2f779 --- /dev/null +++ b/nutsdb/configFs.go @@ -0,0 +1,196 @@ +/*********************************************************************************************************************** + * + * MIT License + * + * Copyright (c) 2021 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * + **********************************************************************************************************************/ + +package nutsdb + +import ( + "errors" + "fmt" + "os" + "path/filepath" + + "github.com/go-playground/validator/v10" + liberr "github.com/nabbar/golib/errors" +) + +const ( + _DefaultFolderData = "data" + _DefaultFolderBackup = "backup" + _DefaultFolderWal = "wal" + _DefaultFolderHost = "host" +) + +type NutsDBFolder struct { + // Working represents the main working folder witch will include sub directories : data, backup, temp... + // If the base directory is empty, all the sub directory will be absolute directories. + Base string `mapstructure:"base" json:"base" yaml:"base" toml:"base"` + + // Data represents the sub-dir for the opening database. + // By default, it will use `data` as sub folder + Data string `mapstructure:"sub_data" json:"sub_data" yaml:"sub_data" toml:"sub_data"` + + // Backup represents the sub-dir with all backup sub-folder. + // By default, it will use `backup` as sub folder + Backup string `mapstructure:"sub_backup" json:"sub_backup" yaml:"sub_backup" toml:"sub_backup"` + + // Temp represents the sub-dir for cluster negotiation. + // By default, it will use the system temporary folder + Temp string `mapstructure:"sub_temp" json:"sub_temp" yaml:"sub_temp" toml:"sub_temp"` + + // Temp represents the sub-dir for cluster negotiation. + // By default, it will use the system temporary folder + WalDir string `mapstructure:"wal_dir" json:"wal_dir" yaml:"wal_dir" toml:"wal_dir"` + + // Temp represents the sub-dir for cluster negotiation. + // By default, it will use the system temporary folder + HostDir string `mapstructure:"host_dir" json:"host_dir" yaml:"host_dir" toml:"host_dir"` + + // LimitNumberBackup represents how many backup will be keep. + LimitNumberBackup uint8 `mapstructure:"limit_number_backup" json:"limit_number_backup" yaml:"limit_number_backup" toml:"limit_number_backup"` + + // Permission represents the perission apply to folder created. + Permission os.FileMode `mapstructure:"permission" json:"permission" yaml:"permission" toml:"permission"` +} + +func (f NutsDBFolder) Validate() liberr.Error { + val := validator.New() + err := val.Struct(f) + + if e, ok := err.(*validator.InvalidValidationError); ok { + return ErrorValidateConfig.ErrorParent(e) + } + + out := ErrorValidateConfig.Error(nil) + + for _, e := range err.(validator.ValidationErrors) { + //nolint goerr113 + out.AddParent(fmt.Errorf("config field '%s' is not validated by constraint '%s'", e.Field(), e.ActualTag())) + } + + if out.HasParent() { + return out + } + + return nil +} + +func (f NutsDBFolder) getDirectory(base, dir string) (string, liberr.Error) { + if f.Permission == 0 { + f.Permission = 0770 + } + + var ( + abs string + err error + ) + + if len(dir) < 1 { + return "", nil + } + + if len(base) > 0 { + dir = filepath.Join(base, dir) + } + + if abs, err = filepath.Abs(dir); err != nil { + return "", ErrorFolderCheck.ErrorParent(err) + } + + if _, err = os.Stat(abs); err != nil && !errors.Is(err, os.ErrNotExist) { + return "", ErrorFolderCheck.ErrorParent(err) + } else if err != nil { + if err = os.MkdirAll(abs, f.Permission); err != nil { + return "", ErrorFolderCreate.ErrorParent(err) + } + } + + return abs, nil +} + +func (f NutsDBFolder) GetDirectoryBase() (string, liberr.Error) { + return f.getDirectory("", f.Base) +} + +func (f NutsDBFolder) GetDirectoryData() (string, liberr.Error) { + if base, err := f.GetDirectoryBase(); err != nil { + return "", err + } else if fs, err := f.getDirectory(base, f.Data); err != nil { + return "", err + } else if fs == "" { + return f.getDirectory(base, _DefaultFolderData) + } else { + return fs, nil + } +} + +func (f NutsDBFolder) GetDirectoryBackup() (string, liberr.Error) { + if base, err := f.GetDirectoryBase(); err != nil { + return "", err + } else if fs, err := f.getDirectory(base, f.Backup); err != nil { + return "", err + } else if fs == "" { + return f.getDirectory(base, _DefaultFolderBackup) + } else { + return fs, nil + } +} + +func (f NutsDBFolder) GetDirectoryWal() (string, liberr.Error) { + if base, err := f.GetDirectoryBase(); err != nil { + return "", err + } else if fs, err := f.getDirectory(base, f.WalDir); err != nil { + return "", err + } else if fs == "" { + return f.getDirectory(base, _DefaultFolderWal) + } else { + return fs, nil + } +} + +func (f NutsDBFolder) GetDirectoryHost() (string, liberr.Error) { + if base, err := f.GetDirectoryBase(); err != nil { + return "", err + } else if fs, err := f.getDirectory(base, f.HostDir); err != nil { + return "", err + } else if fs == "" { + return f.getDirectory(base, _DefaultFolderHost) + } else { + return fs, nil + } +} + +func (f NutsDBFolder) GetDirectoryTemp() (string, liberr.Error) { + if base, err := f.GetDirectoryBase(); err != nil { + return "", err + } else if fs, err := f.getDirectory(base, f.Temp); err != nil { + return "", err + } else if fs == "" { + return f.getDirectory("", os.TempDir()) + } else { + return fs, nil + } +} diff --git a/nutsdb/entryKv.go b/nutsdb/entryKv.go new file mode 100644 index 0000000..cd8a40a --- /dev/null +++ b/nutsdb/entryKv.go @@ -0,0 +1,242 @@ +/*********************************************************************************************************************** + * + * MIT License + * + * Copyright (c) 2021 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * + **********************************************************************************************************************/ + +package nutsdb + +import ( + "fmt" + "reflect" + "runtime" + + "github.com/fxamacker/cbor/v2" + liberr "github.com/nabbar/golib/errors" + liblog "github.com/nabbar/golib/logger" + "github.com/xujiajun/nutsdb" +) + +const ( + _MinSkipCaller = 2 +) + +type CommandRequest struct { + Cmd CmdCode `mapstructure:"cmd" json:"cmd" yaml:"cmd" toml:"cmd" cbor:"cmd"` + Params []interface{} `mapstructure:"params" json:"params" yaml:"params" toml:"params" cbor:"params"` +} + +type CommandResponse struct { + Error error `mapstructure:"error" json:"error" yaml:"error" toml:"error" cbor:"error"` + Value []interface{} `mapstructure:"value" json:"value" yaml:"value" toml:"value" cbor:"value"` +} + +func NewCommand() *CommandRequest { + return &CommandRequest{} +} + +func NewCommandByDecode(p []byte) (*CommandRequest, liberr.Error) { + d := CommandRequest{} + + if e := cbor.Unmarshal(p, &d); e != nil { + return nil, ErrorCommandUnmarshal.ErrorParent(e) + } + + return &d, nil +} + +func NewCommandByCaller(params ...interface{}) *CommandRequest { + pc := make([]uintptr, 10) // at least 1 entry needed + runtime.Callers(_MinSkipCaller, pc) + f := runtime.FuncForPC(pc[0]) + + d := &CommandRequest{} + d.Cmd = CmdCodeFromName(f.Name()) + + if len(params) > 0 { + d.Params = params + } + + return d +} + +func (c *CommandRequest) InitParams(num int) { + c.Params = make([]interface{}, num) +} + +func (c *CommandRequest) InitParamsCounter(num int) int { + c.InitParams(num) + return 0 +} + +func (c *CommandRequest) SetParams(num int, val interface{}) { + if num < len(c.Params) { + c.Params[num] = val + return + } + + tmp := c.Params + c.Params = make([]interface{}, len(c.Params)+1) + + if len(tmp) > 0 { + for i := 0; i < len(tmp); i++ { + c.Params[i] = tmp[i] + } + } + + c.Params[num] = val +} + +func (c *CommandRequest) SetParamsInc(num int, val interface{}) int { + c.SetParams(num, val) + num++ + return num +} + +func (c *CommandRequest) EncodeRequest() ([]byte, liberr.Error) { + if p, e := cbor.Marshal(c); e != nil { + return nil, ErrorCommandMarshal.ErrorParent(e) + } else { + return p, nil + } +} + +func (c *CommandRequest) DecodeResult(p []byte) (*CommandResponse, liberr.Error) { + res := CommandResponse{} + + if e := cbor.Unmarshal(p, &res); e != nil { + return nil, ErrorCommandResultUnmarshal.ErrorParent(e) + } else { + return &res, nil + } +} + +func (c *CommandRequest) RunLocal(tx *nutsdb.Tx) (*CommandResponse, liberr.Error) { + if tx == nil { + return nil, ErrorTransactionClosed.Error(nil) + } + + if c.Cmd == CmdUnknown { + return nil, ErrorClientCommandInvalid.Error(nil) + } + + valTx := reflect.ValueOf(tx) + mtName := c.Cmd.Name() + method := valTx.MethodByName(mtName) + nbPrm := method.Type().NumIn() + + if len(c.Params) != nbPrm { + //nolint #goerr113 + return nil, ErrorClientCommandParamsBadNumber.ErrorParent(fmt.Errorf("%s need %d parameters", c.Cmd.Name(), nbPrm)) + } + + params := make([]reflect.Value, nbPrm) + for i := 0; i < nbPrm; i++ { + v := reflect.ValueOf(c.Params[i]) + liblog.DebugLevel.Logf("Param %d : type %s - Val %v", i, v.Type().Name(), v.Interface()) + + if v.Type().Kind() == method.Type().In(i).Kind() { + params[i] = v + continue + } + + if !v.Type().ConvertibleTo(method.Type().In(i)) { + //nolint #goerr113 + return nil, ErrorClientCommandParamsMismatching.ErrorParent(fmt.Errorf("cmd: %s", mtName), fmt.Errorf("param num: %d", i), fmt.Errorf("param type: %s, avaitting type: %s", v.Type().Kind(), method.Type().In(i).Kind())) + } + + //nolint #exhaustive + switch method.Type().In(i).Kind() { + case reflect.Bool: + params[i] = reflect.ValueOf(v.Bool()) + case reflect.Int: + params[i] = reflect.ValueOf(int(v.Int())) + case reflect.Int8: + params[i] = reflect.ValueOf(int8(v.Int())) + case reflect.Int16: + params[i] = reflect.ValueOf(int8(v.Int())) + case reflect.Int32: + params[i] = reflect.ValueOf(int16(v.Int())) + case reflect.Int64: + params[i] = reflect.ValueOf(v.Int()) + case reflect.Uintptr: + params[i] = reflect.ValueOf(v.UnsafeAddr()) + case reflect.Uint: + params[i] = reflect.ValueOf(uint(v.Uint())) + case reflect.Uint8: + params[i] = reflect.ValueOf(uint8(v.Uint())) + case reflect.Uint16: + params[i] = reflect.ValueOf(uint16(v.Uint())) + case reflect.Uint32: + params[i] = reflect.ValueOf(uint32(v.Uint())) + case reflect.Uint64: + params[i] = reflect.ValueOf(v.Uint()) + case reflect.Float32: + params[i] = reflect.ValueOf(float32(v.Float())) + case reflect.Float64: + params[i] = reflect.ValueOf(v.Float()) + case reflect.Complex64: + params[i] = reflect.ValueOf(complex64(v.Complex())) + case reflect.Complex128: + params[i] = reflect.ValueOf(v.Complex()) + case reflect.Interface: + params[i] = reflect.ValueOf(v.Interface()) + case reflect.String: + params[i] = reflect.ValueOf(v.String()) + } + + liblog.DebugLevel.Logf("Change Param %d : type %s to %v", i, v.Type().Name(), params[i].Type().Name()) + } + + resp := method.Call(params) + ret := CommandResponse{ + Error: nil, + Value: make([]interface{}, 0), + } + + for i := 0; i < len(resp); i++ { + v := resp[i].Interface() + if e, ok := v.(error); ok { + ret.Error = e + } else { + ret.Value = append(ret.Value, v) + } + } + + if ret.Error == nil && len(ret.Value) < 1 { + return nil, nil + } + + return &ret, nil +} + +func (c *CommandRequest) Run(tx *nutsdb.Tx) ([]byte, liberr.Error) { + if r, err := c.RunLocal(tx); err != nil { + return nil, err + } else if p, e := cbor.Marshal(r); e != nil { + return nil, ErrorCommandResultMarshal.ErrorParent(e) + } else { + return p, nil + } +} diff --git a/nutsdb/errors.go b/nutsdb/errors.go new file mode 100644 index 0000000..b763bc5 --- /dev/null +++ b/nutsdb/errors.go @@ -0,0 +1,150 @@ +/*********************************************************************************************************************** + * + * MIT License + * + * Copyright (c) 2021 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * + **********************************************************************************************************************/ + +package nutsdb + +import liberr "github.com/nabbar/golib/errors" + +const ( + ErrorParamsEmpty liberr.CodeError = iota + liberr.MinPkgNutsDB + ErrorParamsMissing + ErrorParamsMismatching + ErrorValidateConfig + ErrorValidateNutsDB + ErrorClusterInit + ErrorFolderCheck + ErrorFolderCreate + ErrorFolderCopy + ErrorFolderDelete + ErrorFolderExtract + ErrorFolderArchive + ErrorFolderCompress + ErrorDatabaseClosed + ErrorDatabaseKeyInvalid + ErrorDatabaseBackup + ErrorDatabaseSnapshot + ErrorTransactionInit + ErrorTransactionClosed + ErrorTransactionCommit + ErrorTransactionPutKey + ErrorCommandInvalid + ErrorCommandUnmarshal + ErrorCommandMarshal + ErrorCommandResultUnmarshal + ErrorCommandResultMarshal + ErrorLogEntryAdd + ErrorClientCommandInvalid + ErrorClientCommandParamsBadNumber + ErrorClientCommandParamsMismatching + ErrorClientCommandCall + ErrorClientCommandCommit + ErrorClientCommandResponseInvalid +) + +var isCodeError = false + +func IsCodeError() bool { + return isCodeError +} + +func init() { + isCodeError = liberr.ExistInMapMessage(ErrorParamsEmpty) + liberr.RegisterIdFctMessage(ErrorParamsEmpty, getMessage) +} + +func getMessage(code liberr.CodeError) (message string) { + switch code { + case liberr.UNK_ERROR: + return "" + case ErrorParamsEmpty: + return "at least on given parameters is empty" + case ErrorParamsMissing: + return "at least on given parameters is missing" + case ErrorParamsMismatching: + return "at least on given parameters is mismatching awaiting type" + case ErrorValidateConfig: + return "config seems to be invalid" + case ErrorValidateNutsDB: + return "database config seems to be invalid" + case ErrorClusterInit: + return "cannot start or join cluster" + case ErrorFolderCheck: + return "error while trying to check or stat folder" + case ErrorFolderCreate: + return "error while trying to create folder" + case ErrorFolderCopy: + return "error while trying to copy folder" + case ErrorFolderArchive: + return "error while trying to archive folder" + case ErrorFolderCompress: + return "error while trying to compress folder" + case ErrorFolderExtract: + return "error while trying to extract snapshot archive" + case ErrorDatabaseClosed: + return "database is closed" + case ErrorDatabaseKeyInvalid: + return "database key seems to be invalid" + case ErrorDatabaseBackup: + return "error occurs while trying to backup database folder" + case ErrorDatabaseSnapshot: + return "error occurs while trying to backup database to cluster members" + case ErrorTransactionInit: + return "cannot initialize new transaction from database" + case ErrorTransactionClosed: + return "transaction is closed" + case ErrorTransactionCommit: + return "cannot commit transaction writable into database" + case ErrorTransactionPutKey: + return "cannot send Put command into database transaction" + case ErrorCommandInvalid: + return "given query is not a valid DB command" + case ErrorCommandUnmarshal: + return "cannot unmarshall DB command" + case ErrorCommandMarshal: + return "cannot marshall DB command" + case ErrorCommandResultUnmarshal: + return "cannot unmarshall DB command result" + case ErrorCommandResultMarshal: + return "cannot marshall DB command result" + case ErrorLogEntryAdd: + return "cannot add key/value to database" + case ErrorClientCommandInvalid: + return "invalid command" + case ErrorClientCommandParamsBadNumber: + return "invalid number of parameters for client command" + case ErrorClientCommandParamsMismatching: + return "invalid type of parameter for client command" + case ErrorClientCommandCall: + return "error occurs while running client command" + case ErrorClientCommandCommit: + return "error occurs while commit client command" + case ErrorClientCommandResponseInvalid: + return "response of requested client command seems to be invalid" + } + + return "" +} diff --git a/nutsdb/interface.go b/nutsdb/interface.go new file mode 100644 index 0000000..7ac578b --- /dev/null +++ b/nutsdb/interface.go @@ -0,0 +1,65 @@ +/*********************************************************************************************************************** + * + * MIT License + * + * Copyright (c) 2021 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * + **********************************************************************************************************************/ + +package nutsdb + +import ( + "context" + "sync/atomic" + "time" + + libclu "github.com/nabbar/golib/cluster" + liberr "github.com/nabbar/golib/errors" +) + +type NutsDB interface { + Listen() liberr.Error + Restart() liberr.Error + Shutdown() liberr.Error + + ForceRestart() + ForceShutdown() + + IsRunning() bool + IsReady(ctx context.Context) bool + WaitReady(ctx context.Context, tick time.Duration) + + //StatusInfo() (name string, release string, hash string) + //StatusHealth() error + //StatusRoute(prefix string, fctMessage status.FctMessage, sts status.RouteStatus) + + Cluster() libclu.Cluster + Client(ctx context.Context, tickSync time.Duration) Client +} + +func New(c Config) NutsDB { + return &ndb{ + c: c, + t: new(atomic.Value), + r: new(atomic.Value), + } +} diff --git a/nutsdb/model.go b/nutsdb/model.go new file mode 100644 index 0000000..7c79f70 --- /dev/null +++ b/nutsdb/model.go @@ -0,0 +1,203 @@ +/*********************************************************************************************************************** + * + * MIT License + * + * Copyright (c) 2021 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * + **********************************************************************************************************************/ + +package nutsdb + +import ( + "context" + "sync/atomic" + "time" + + dgbstm "github.com/lni/dragonboat/v3/statemachine" + libclu "github.com/nabbar/golib/cluster" + liberr "github.com/nabbar/golib/errors" +) + +type ndb struct { + c Config + t *atomic.Value + r *atomic.Value +} + +func (n *ndb) createNodeMachine(node uint64, cluster uint64) dgbstm.IOnDiskStateMachine { + var ( + err liberr.Error + opt Options + ) + + if opt, err = n.c.GetOptions(); err != nil { + panic(err) + } + + return newNode(node, cluster, opt, n.setRunning) +} + +func (n *ndb) newCluster() liberr.Error { + var ( + clu libclu.Cluster + err liberr.Error + cfg libclu.Config + ) + + if i := n.t.Load(); i != nil { + if err = n.Shutdown(); err != nil { + return err + } + } + + if cfg, err = n.c.GetConfigCluster(); err != nil { + return err + } + + clu, err = libclu.NewCluster(cfg, nil) + + if err != nil { + return err + } + + clu.SetFctCreateSTMOnDisk(n.createNodeMachine) + n.t.Store(clu) + return nil +} + +func (n *ndb) IsRunning() bool { + if i := n.r.Load(); i == nil { + return false + } else if b, ok := i.(bool); !ok { + return false + } else { + return b + } +} + +func (n *ndb) setRunning(state bool) { + if n == nil || n.r == nil { + return + } else { + n.r.Store(state) + } +} + +func (n *ndb) IsReady(ctx context.Context) bool { + if m, e := n.Cluster().SyncGetClusterMembership(ctx); e != nil || m == nil || len(m.Nodes) < 1 { + return false + } + + if _, ok, e := n.Cluster().GetLeaderID(); e != nil || !ok { + return false + } else { + return true + } +} + +func (n *ndb) WaitReady(ctx context.Context, tick time.Duration) { + for { + if n.IsRunning() && n.IsReady(ctx) { + return + } + + time.Sleep(tick) + } +} + +func (n *ndb) Listen() liberr.Error { + var ( + c libclu.Cluster + e liberr.Error + ) + + if c = n.Cluster(); c == nil { + if e = n.newCluster(); e != nil { + return e + } else if c = n.Cluster(); c == nil { + return ErrorClusterInit.Error(nil) + } + } + + if e = c.ClusterStart(len(n.c.Cluster.InitMember) < 1); e != nil { + return e + } + + n.t.Store(c) + return nil +} + +func (n *ndb) Restart() liberr.Error { + return n.Listen() +} + +func (n *ndb) ForceRestart() { + n.ForceShutdown() + _ = n.Listen() +} + +func (n *ndb) Shutdown() liberr.Error { + if i := n.t.Load(); i == nil { + return nil + } else if c, ok := i.(libclu.Cluster); !ok { + return nil + } else if !c.HasNodeInfo(0) { + return nil + } else if err := c.NodeStop(0); err != nil { + return err + } else { + return nil + } +} + +func (n *ndb) ForceShutdown() { + if err := n.Shutdown(); err == nil { + return + } else if i := n.t.Load(); i == nil { + return + } else if c, ok := i.(libclu.Cluster); !ok { + return + } else if !c.HasNodeInfo(0) { + return + } else { + _ = c.ClusterStop(true) + } +} + +func (n *ndb) Cluster() libclu.Cluster { + if i := n.t.Load(); i == nil { + return nil + } else if c, ok := i.(libclu.Cluster); !ok { + return nil + } else { + return c + } +} + +func (n *ndb) Client(ctx context.Context, tickSync time.Duration) Client { + return &clientNutDB{ + x: ctx, + t: tickSync, + c: n.Cluster, + w: n.WaitReady, + } +} diff --git a/nutsdb/node.go b/nutsdb/node.go new file mode 100644 index 0000000..6c0e8c5 --- /dev/null +++ b/nutsdb/node.go @@ -0,0 +1,354 @@ +/*********************************************************************************************************************** + * + * MIT License + * + * Copyright (c) 2021 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * + **********************************************************************************************************************/ + +package nutsdb + +import ( + "errors" + "io" + "strings" + "sync/atomic" + + dgbstm "github.com/lni/dragonboat/v3/statemachine" + liberr "github.com/nabbar/golib/errors" + "github.com/xujiajun/nutsdb" +) + +const ( + _RaftBucket = "_raft" + _RaftKeyAppliedIndex = "_raft_applied_index" + _WordByteTrue = 0xff +) + +func newNode(node uint64, cluster uint64, opt Options, fct func(state bool)) dgbstm.IOnDiskStateMachine { + if fct == nil { + fct = func(state bool) {} + } + + return &nutsNode{ + n: node, + c: cluster, + o: opt, + r: fct, + d: new(atomic.Value), + } +} + +type nutsNode struct { + n uint64 // nodeId + c uint64 // clusterId + o Options // options nutsDB + r func(state bool) // is running + d *atomic.Value // nutsDB database pointer +} + +func (n *nutsNode) setRunning(state bool) { + if n != nil && n.r != nil { + n.r(state) + } +} + +func (n *nutsNode) newTx(writable bool) (*nutsdb.Tx, liberr.Error) { + if n == nil || n.d == nil { + return nil, ErrorDatabaseClosed.Error(nil) + } + if i := n.d.Load(); i == nil { + return nil, ErrorDatabaseClosed.Error(nil) + } else if db, ok := i.(*nutsdb.DB); !ok { + return nil, ErrorDatabaseClosed.Error(nil) + } else if tx, e := db.Begin(writable); e != nil { + return nil, ErrorTransactionInit.ErrorParent(e) + } else { + return tx, nil + } +} + +func (n *nutsNode) getRaftLogIndexLastApplied() (idxRaftlog uint64, err error) { + var ( + tx *nutsdb.Tx + en *nutsdb.Entry + ) + + if tx, err = n.newTx(false); err != nil { + return 0, err + } + + defer func() { + _ = tx.Rollback() + }() + + if en, err = tx.Get(_RaftBucket, []byte(_RaftKeyAppliedIndex)); err != nil && errors.Is(err, nutsdb.ErrBucketNotFound) { + return 0, nil + } else if err != nil && errors.Is(err, nutsdb.ErrBucketEmpty) { + return 0, nil + } else if err != nil && errors.Is(err, nutsdb.ErrNotFoundKey) { + return 0, nil + } else if err != nil && errors.Is(err, nutsdb.ErrKeyNotFound) { + return 0, nil + } else if err != nil && errors.Is(err, nutsdb.ErrKeyEmpty) { + return 0, nil + } else if err != nil && strings.HasPrefix(err.Error(), "not found bucket") { + return 0, nil + } else if err != nil { + return 0, err + } else if en.IsZero() { + return 0, nil + } else { + return n.btoi64(en.Value), nil + } +} + +func (n *nutsNode) i64tob(val uint64) []byte { + r := make([]byte, 8) + for i := uint64(0); i < 8; i++ { + r[i] = byte((val >> (i * 8)) & _WordByteTrue) + } + return r +} + +func (n *nutsNode) btoi64(val []byte) uint64 { + r := uint64(0) + for i := uint64(0); i < 8; i++ { + r |= uint64(val[i]) << (8 * i) + } + return r +} + +func (n *nutsNode) applyRaftLogIndexLastApplied(idx uint64) error { + var ( + e error + tx *nutsdb.Tx + err liberr.Error + ) + + if tx, err = n.newTx(true); err != nil { + return err + } + + defer func() { + _ = tx.Rollback() + }() + + if e = tx.Put(_RaftBucket, []byte(_RaftKeyAppliedIndex), n.i64tob(idx), 0); e != nil { + return ErrorTransactionPutKey.ErrorParent(e) + } else if e = tx.Commit(); e != nil { + return ErrorTransactionCommit.ErrorParent(e) + } else { + return nil + } +} + +// Open @TODO : analyze channel role !! +func (n *nutsNode) Open(stopc <-chan struct{}) (idxRaftlog uint64, err error) { + var db *nutsdb.DB + + if db, err = nutsdb.Open(n.o.NutsDBOptions()); err != nil { + return 0, err + } else { + n.d.Store(db) + n.setRunning(true) + + if idxRaftlog, err = n.getRaftLogIndexLastApplied(); err != nil { + _ = n.Close() + } + + return + } +} + +func (n *nutsNode) Close() error { + defer n.setRunning(false) + + if n == nil || n.d == nil { + + return nil + } else if i := n.d.Load(); i == nil { + return nil + } else if db, ok := i.(*nutsdb.DB); !ok { + return nil + } else { + return db.Close() + } +} + +func (n *nutsNode) Update(logEntry []dgbstm.Entry) ([]dgbstm.Entry, error) { + var ( + e error + + tx *nutsdb.Tx + kv *CommandRequest + + err liberr.Error + res []byte + idx int + ent dgbstm.Entry + ) + + if tx, err = n.newTx(true); err != nil { + return nil, err + } + + for idx, ent = range logEntry { + if kv, err = NewCommandByDecode(ent.Cmd); err != nil { + logEntry[idx].Result = dgbstm.Result{ + Value: 0, + Data: nil, + } + + _ = tx.Rollback() + return logEntry, err + } else if res, err = kv.Run(tx); err != nil { + logEntry[idx].Result = dgbstm.Result{ + Value: 0, + Data: nil, + } + + _ = tx.Rollback() + return logEntry, err + } else { + logEntry[idx].Result = dgbstm.Result{ + Value: uint64(idx), + Data: res, + } + } + } + + if e = tx.Commit(); e != nil { + _ = tx.Rollback() + return logEntry, ErrorTransactionCommit.ErrorParent(e) + } + + return logEntry, n.applyRaftLogIndexLastApplied(logEntry[len(logEntry)-1].Index) +} + +func (n *nutsNode) Lookup(query interface{}) (value interface{}, err error) { + var ( + t *nutsdb.Tx + r *CommandResponse + c *CommandRequest + e liberr.Error + + ok bool + ) + + if t, e = n.newTx(true); e != nil { + return nil, e + } + + defer func() { + if t != nil { + _ = t.Rollback() + } + }() + + if c, ok = query.(*CommandRequest); !ok { + return nil, ErrorCommandInvalid.Error(nil) + } else if r, e = c.RunLocal(t); e != nil { + return nil, e + } else { + return r, nil + } +} + +func (n *nutsNode) Sync() error { + return nil +} + +func (n *nutsNode) PrepareSnapshot() (interface{}, error) { + if n == nil || n.d == nil { + return nil, ErrorDatabaseClosed.Error(nil) + } + + var sh = newSnap() + + if i := n.d.Load(); i == nil { + sh.Finish() + return nil, ErrorDatabaseClosed.Error(nil) + } else if db, ok := i.(*nutsdb.DB); !ok { + sh.Finish() + return nil, ErrorDatabaseClosed.Error(nil) + } else if err := sh.Prepare(n.o, db); err != nil { + sh.Finish() + return nil, ErrorDatabaseBackup.ErrorParent(err) + } else { + return sh, nil + } +} + +func (n *nutsNode) SaveSnapshot(i interface{}, writer io.Writer, c <-chan struct{}) error { + if n == nil || n.d == nil { + return ErrorDatabaseClosed.Error(nil) + } + + if i == nil { + return ErrorParamsEmpty.Error(nil) + } else if sh, ok := snapCast(i); !ok { + return ErrorParamsMismatching.Error(nil) + } else if err := sh.Save(n.o, writer); err != nil { + sh.Finish() + return err + } else { + sh.Finish() + } + + return nil +} + +func (n *nutsNode) RecoverFromSnapshot(reader io.Reader, c <-chan struct{}) error { + if n == nil || n.d == nil { + return ErrorDatabaseClosed.Error(nil) + } + + s := newSnap() + defer s.Finish() + + if err := s.Load(n.o, reader); err != nil { + return err + } + + if i := n.d.Load(); i == nil { + if err := s.Apply(n.o); err != nil { + return err + } + } else if db, ok := i.(*nutsdb.DB); !ok { + if err := s.Apply(n.o); err != nil { + return err + } + } else { + _ = db.Close() + if err := s.Apply(n.o); err != nil { + return err + } + } + + //@TODO : check channel is ok.... + if _, err := n.Open(nil); err != nil { + return err + } + + return nil +} diff --git a/nutsdb/options.go b/nutsdb/options.go new file mode 100644 index 0000000..1792cdb --- /dev/null +++ b/nutsdb/options.go @@ -0,0 +1,210 @@ +/*********************************************************************************************************************** + * + * MIT License + * + * Copyright (c) 2021 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * + **********************************************************************************************************************/ + +package nutsdb + +import ( + "bytes" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "runtime" + "strconv" + "strings" + "time" + + liberr "github.com/nabbar/golib/errors" + "github.com/xujiajun/nutsdb" + "github.com/xujiajun/utils/filesystem" +) + +type Options interface { + NutsDBOptions() nutsdb.Options + + NewBackup(db *nutsdb.DB) (string, liberr.Error) + NewBackupTemp(db *nutsdb.DB) (string, liberr.Error) + + NewTempFolder() (string, liberr.Error) + NewTempFile(extension string) (string, liberr.Error) + + CleanBackup() liberr.Error + Permission() os.FileMode + + RestoreBackup(dir string) liberr.Error +} + +func NewOptions(cfgNuts NutsDBOptions, cfgFs NutsDBFolder) (Options, liberr.Error) { + if _, err := cfgFs.GetDirectoryBase(); err != nil { + return nil, err + } + + o := &options{ + limit: cfgFs.LimitNumberBackup, + perm: cfgFs.Permission, + } + + if fs, err := cfgFs.GetDirectoryData(); err != nil { + return nil, err + } else { + o.dirs.data = fs + } + + if fs, err := cfgFs.GetDirectoryBackup(); err != nil { + return nil, err + } else { + o.dirs.backup = fs + } + + if fs, err := cfgFs.GetDirectoryTemp(); err != nil { + return nil, err + } else { + o.dirs.temp = fs + } + + o.nuts = cfgNuts.GetNutsDBOptions(o.dirs.data) + + return o, nil +} + +type options struct { + nuts nutsdb.Options + dirs struct { + data string + backup string + temp string + } + limit uint8 + perm os.FileMode +} + +func (o options) NutsDBOptions() nutsdb.Options { + return o.nuts +} + +func (o options) NewBackup(db *nutsdb.DB) (string, liberr.Error) { + fld := o.getBackupDirName() + + if e := os.MkdirAll(filepath.Join(o.dirs.backup, fld), o.perm); e != nil { + return "", ErrorFolderCreate.ErrorParent(e) + } else if err := o.newBackupDir(fld, db); err != nil { + return "", err + } else { + return fld, nil + } +} + +func (o options) NewBackupTemp(db *nutsdb.DB) (string, liberr.Error) { + if fld, err := o.NewTempFolder(); err != nil { + return "", err + } else if err = o.newBackupDir(fld, db); err != nil { + return "", err + } else { + return fld, nil + } +} + +func (o options) NewTempFolder() (string, liberr.Error) { + if p, e := ioutil.TempDir(o.dirs.temp, o.getTempPrefix()); e != nil { + return "", ErrorFolderCreate.ErrorParent(e) + } else { + _ = os.Chmod(p, o.perm) + return p, nil + } +} + +func (o options) NewTempFile(extension string) (string, liberr.Error) { + pattern := o.getTempPrefix() + "-*" + + if extension != "" { + pattern = pattern + "." + extension + } + + if file, e := ioutil.TempFile(o.dirs.temp, pattern); e != nil { + return "", ErrorFolderCreate.ErrorParent(e) + } else { + p := file.Name() + _ = file.Close() + + return p, nil + } +} + +func (o options) CleanBackup() liberr.Error { + panic("implement me") +} + +func (o options) Permission() os.FileMode { + return o.perm +} + +func (o options) RestoreBackup(dir string) liberr.Error { + if err := os.RemoveAll(o.dirs.data); err != nil { + return ErrorFolderDelete.ErrorParent(err) + } else if err = filesystem.CopyDir(dir, o.dirs.data); err != nil { + return ErrorFolderCopy.ErrorParent(err) + } else { + _ = os.Chmod(o.dirs.data, o.perm) + } + + return nil +} + +/// private + +func (o options) newBackupDir(dir string, db *nutsdb.DB) liberr.Error { + if err := db.Backup(dir); err != nil { + return ErrorDatabaseBackup.ErrorParent(err) + } + + return nil +} + +func (o options) getTempPrefix() string { + b := make([]byte, 64) + + b = b[:runtime.Stack(b, false)] + b = bytes.TrimPrefix(b, []byte("goroutine ")) + b = b[:bytes.IndexByte(b, ' ')] + + //nolint #nosec + /* #nosec */ + n, _ := strconv.ParseUint(string(b), 10, 64) + + return fmt.Sprintf("%d", n) +} + +func (o options) getBackupDirName() string { + part := strings.Split(time.Now().Format(time.RFC3339), "T") + dt := part[0] + + part = strings.Split(part[1], "Z") + tm := strings.Replace(part[0], ":", "-", -1) + tz := strings.Replace(part[1], ":", "", -1) + + return dt + "T" + tm + "Z" + tz +} diff --git a/nutsdb/snap.go b/nutsdb/snap.go new file mode 100644 index 0000000..bdd3521 --- /dev/null +++ b/nutsdb/snap.go @@ -0,0 +1,175 @@ +/*********************************************************************************************************************** + * + * MIT License + * + * Copyright (c) 2021 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * + **********************************************************************************************************************/ + +package nutsdb + +import ( + "io" + "os" + "path" + + "github.com/nabbar/golib/archive" + liberr "github.com/nabbar/golib/errors" + "github.com/nabbar/golib/ioutils" + "github.com/xujiajun/nutsdb" +) + +type Snapshot interface { + Prepare(opt Options, db *nutsdb.DB) liberr.Error + + Save(opt Options, writer io.Writer) liberr.Error + Load(opt Options, reader io.Reader) liberr.Error + + Apply(opt Options) liberr.Error + + Finish() +} + +func newSnap() Snapshot { + return &snap{} +} + +func snapCast(i interface{}) (Snapshot, bool) { + if sp, ok := i.(*snap); ok { + return sp, true + } else if ss, ok := i.(snap); ok { + return &ss, true + } else if si, ok := i.(Snapshot); ok { + return si, true + } else { + return nil, false + } +} + +type snap struct { + path string +} + +func (s *snap) Prepare(opt Options, db *nutsdb.DB) liberr.Error { + if dir, err := opt.NewBackupTemp(db); err != nil { + return err + } else { + s.path = dir + } + + return nil +} + +func (s *snap) Save(opt Options, writer io.Writer) liberr.Error { + var ( + tar string + err error + + t ioutils.FileProgress + e liberr.Error + ) + + if tar, e = opt.NewTempFile("tar"); e != nil { + return e + } + + defer func() { + _ = os.Remove(tar) + }() + + if t, e = ioutils.NewFileProgressPathWrite(tar, false, true, 0664); e != nil { + return e + } + + defer func() { + _ = t.Close() + }() + + if _, e = archive.CreateArchive(archive.ArchiveTypeTarGzip, t, s.path); e != nil { + return ErrorFolderArchive.Error(e) + } + + if _, err = t.Seek(0, io.SeekStart); err != nil { + return ErrorDatabaseSnapshot.ErrorParent(err) + } + + if _, err = t.WriteTo(writer); err != nil { + return ErrorDatabaseSnapshot.ErrorParent(err) + } + + return nil +} + +func (s *snap) Load(opt Options, reader io.Reader) liberr.Error { + var ( + arc string + out string + err error + + a ioutils.FileProgress + e liberr.Error + ) + + if arc, e = opt.NewTempFile("tar.gz"); e != nil { + return e + } + + defer func() { + _ = os.Remove(arc) + }() + + if a, e = ioutils.NewFileProgressPathWrite(arc, false, true, 0664); e != nil { + return e + } + + defer func() { + _ = a.Close() + }() + + if _, err = a.ReadFrom(reader); err != nil { + return ErrorDatabaseSnapshot.ErrorParent(err) + } + + if out, e = opt.NewTempFolder(); e != nil { + return e + } + + if e = archive.ExtractAll(a, path.Base(arc), out, opt.Permission()); e != nil { + return ErrorFolderExtract.Error(e) + } + + s.path = out + + return nil +} + +func (s *snap) Apply(opt Options) liberr.Error { + if e := opt.RestoreBackup(s.path); e != nil { + return ErrorDatabaseSnapshot.Error(e) + } + + return nil +} + +func (s *snap) Finish() { + _ = os.RemoveAll(s.path) +}