mirror of
https://github.com/asdine/storm.git
synced 2025-09-26 19:01:14 +08:00
Refactor Open and DB
This commit is contained in:
@@ -16,9 +16,9 @@ func TestBucket(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
require.Nil(t, db.root.GetBucket(readTx, "none"))
|
||||
require.Nil(t, db.GetBucket(readTx, "none"))
|
||||
|
||||
b, err := db.root.CreateBucketIfNotExists(readTx, "new")
|
||||
b, err := db.CreateBucketIfNotExists(readTx, "new")
|
||||
|
||||
// Cannot create buckets in a read transaction
|
||||
require.Error(t, err)
|
||||
@@ -36,9 +36,9 @@ func TestBucket(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
require.Nil(t, db.root.GetBucket(writeTx, "none"))
|
||||
require.Nil(t, db.GetBucket(writeTx, "none"))
|
||||
|
||||
b, err = db.root.CreateBucketIfNotExists(writeTx, "new")
|
||||
b, err = db.CreateBucketIfNotExists(writeTx, "new")
|
||||
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, b)
|
||||
@@ -59,8 +59,8 @@ func TestBucket(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
require.NotNil(t, db.root.GetBucket(readTx, "new"))
|
||||
require.Nil(t, db.root.GetBucket(readTx, "c"))
|
||||
require.NotNil(t, db.GetBucket(readTx, "new"))
|
||||
require.Nil(t, db.GetBucket(readTx, "c"))
|
||||
require.NotNil(t, n2.GetBucket(readTx, "c"))
|
||||
|
||||
readTx.Rollback()
|
||||
|
@@ -17,13 +17,13 @@ func TestNode(t *testing.T) {
|
||||
node1, ok := n1.(*node)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, db, node1.s)
|
||||
require.NotEqual(t, db.root, n1)
|
||||
require.Equal(t, []string{"a"}, db.root.rootBucket)
|
||||
require.Equal(t, []string{"b", "c"}, node1.rootBucket)
|
||||
require.NotEqual(t, db.Node, n1)
|
||||
require.Equal(t, []string{"a"}, db.Node.(*node).rootBucket)
|
||||
require.Equal(t, []string{"a", "b", "c"}, node1.rootBucket)
|
||||
n2 := n1.From("d", "e")
|
||||
node2, ok := n2.(*node)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, []string{"b", "c", "d", "e"}, node2.rootBucket)
|
||||
require.Equal(t, []string{"a", "b", "c", "d", "e"}, node2.rootBucket)
|
||||
}
|
||||
|
||||
func TestNodeWithTransaction(t *testing.T) {
|
||||
|
63
options.go
63
options.go
@@ -9,44 +9,47 @@ import (
|
||||
)
|
||||
|
||||
// BoltOptions used to pass options to BoltDB.
|
||||
func BoltOptions(mode os.FileMode, options *bolt.Options) func(*DB) error {
|
||||
return func(d *DB) error {
|
||||
d.boltMode = mode
|
||||
d.boltOptions = options
|
||||
func BoltOptions(mode os.FileMode, options *bolt.Options) func(*Options) error {
|
||||
return func(opts *Options) error {
|
||||
opts.boltMode = mode
|
||||
opts.boltOptions = options
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Codec used to set a custom encoder and decoder. The default is JSON.
|
||||
func Codec(c codec.MarshalUnmarshaler) func(*DB) error {
|
||||
return func(d *DB) error {
|
||||
d.codec = c
|
||||
func Codec(c codec.MarshalUnmarshaler) func(*Options) error {
|
||||
return func(opts *Options) error {
|
||||
opts.codec = c
|
||||
// d.root.codec = c
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Batch enables the use of batch instead of update for read-write transactions.
|
||||
func Batch() func(*DB) error {
|
||||
return func(d *DB) error {
|
||||
d.batchMode = true
|
||||
func Batch() func(*Options) error {
|
||||
return func(opts *Options) error {
|
||||
opts.batchMode = true
|
||||
// d.root.batchMode = true
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Root used to set the root bucket. See also the From method.
|
||||
func Root(root ...string) func(*DB) error {
|
||||
return func(d *DB) error {
|
||||
d.rootBucket = root
|
||||
func Root(root ...string) func(*Options) error {
|
||||
return func(opts *Options) error {
|
||||
opts.rootBucket = root
|
||||
// d.root.rootBucket = root
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// UseDB allow Storm to use an existing open Bolt.DB.
|
||||
// UseDB allows Storm to use an existing open Bolt.DB.
|
||||
// Warning: storm.DB.Close() will close the bolt.DB instance.
|
||||
func UseDB(b *bolt.DB) func(*DB) error {
|
||||
return func(d *DB) error {
|
||||
d.Path = b.Path()
|
||||
d.Bolt = b
|
||||
func UseDB(b *bolt.DB) func(*Options) error {
|
||||
return func(opts *Options) error {
|
||||
opts.path = b.Path()
|
||||
opts.bolt = b
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@@ -71,3 +74,27 @@ func Reverse() func(*index.Options) {
|
||||
opts.Reverse = true
|
||||
}
|
||||
}
|
||||
|
||||
// Options are used to customize the way Storm opens a database.
|
||||
type Options struct {
|
||||
// Handles encoding and decoding of objects
|
||||
codec codec.MarshalUnmarshaler
|
||||
|
||||
// Bolt file mode
|
||||
boltMode os.FileMode
|
||||
|
||||
// Bolt options
|
||||
boltOptions *bolt.Options
|
||||
|
||||
// Enable batch mode for read-write transaction, instead of update mode
|
||||
batchMode bool
|
||||
|
||||
// The root bucket name
|
||||
rootBucket []string
|
||||
|
||||
// Path of the database file
|
||||
path string
|
||||
|
||||
// Bolt is still easily accessible
|
||||
bolt *bolt.DB
|
||||
}
|
||||
|
@@ -154,7 +154,7 @@ func TestSave(t *testing.T) {
|
||||
val := bucket.Get(i)
|
||||
require.NotNil(t, val)
|
||||
|
||||
content, err := db.codec.Marshal(&v)
|
||||
content, err := db.Codec().Marshal(&v)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, content, val)
|
||||
return nil
|
||||
@@ -347,7 +347,7 @@ func TestSaveDifferentBucketRoot(t *testing.T) {
|
||||
db, cleanup := createDB(t)
|
||||
defer cleanup()
|
||||
|
||||
require.Len(t, db.rootBucket, 0)
|
||||
require.Len(t, db.Node.(*node).rootBucket, 0)
|
||||
|
||||
dbSub := db.From("sub").(*node)
|
||||
|
||||
|
249
storm.go
249
storm.go
@@ -3,13 +3,10 @@ package storm
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/asdine/storm/codec"
|
||||
"github.com/asdine/storm/codec/json"
|
||||
"github.com/asdine/storm/index"
|
||||
"github.com/asdine/storm/q"
|
||||
"github.com/boltdb/bolt"
|
||||
)
|
||||
|
||||
@@ -22,92 +19,65 @@ const (
|
||||
var defaultCodec = json.Codec
|
||||
|
||||
// Open opens a database at the given path with optional Storm options.
|
||||
func Open(path string, stormOptions ...func(*DB) error) (*DB, error) {
|
||||
func Open(path string, stormOptions ...func(*Options) error) (*DB, error) {
|
||||
var err error
|
||||
|
||||
s := &DB{
|
||||
Path: path,
|
||||
codec: defaultCodec,
|
||||
}
|
||||
|
||||
var opts Options
|
||||
for _, option := range stormOptions {
|
||||
if err = option(s); err != nil {
|
||||
if err = option(&opts); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if s.boltMode == 0 {
|
||||
s.boltMode = 0600
|
||||
s := DB{
|
||||
Bolt: opts.bolt,
|
||||
}
|
||||
|
||||
if s.boltOptions == nil {
|
||||
s.boltOptions = &bolt.Options{Timeout: 1 * time.Second}
|
||||
n := node{
|
||||
s: &s,
|
||||
codec: opts.codec,
|
||||
batchMode: opts.batchMode,
|
||||
rootBucket: opts.rootBucket,
|
||||
}
|
||||
|
||||
s.root = &node{s: s, rootBucket: s.rootBucket, codec: s.codec, batchMode: s.batchMode}
|
||||
if n.codec == nil {
|
||||
n.codec = defaultCodec
|
||||
}
|
||||
|
||||
if opts.boltMode == 0 {
|
||||
opts.boltMode = 0600
|
||||
}
|
||||
|
||||
if opts.boltOptions == nil {
|
||||
opts.boltOptions = &bolt.Options{Timeout: 1 * time.Second}
|
||||
}
|
||||
|
||||
s.Node = &n
|
||||
|
||||
// skip if UseDB option is used
|
||||
if s.Bolt == nil {
|
||||
s.Bolt, err = bolt.Open(path, s.boltMode, s.boltOptions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = s.checkVersion()
|
||||
s.Bolt, err = bolt.Open(path, opts.boltMode, opts.boltOptions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return s, nil
|
||||
err = s.checkVersion()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &s, nil
|
||||
}
|
||||
|
||||
// DB is the wrapper around BoltDB. It contains an instance of BoltDB and uses it to perform all the
|
||||
// needed operations
|
||||
type DB struct {
|
||||
// Path of the database file
|
||||
Path string
|
||||
|
||||
// Handles encoding and decoding of objects
|
||||
codec codec.MarshalUnmarshaler
|
||||
// The root node that points to the root bucket.
|
||||
Node
|
||||
|
||||
// Bolt is still easily accessible
|
||||
Bolt *bolt.DB
|
||||
|
||||
// Bolt file mode
|
||||
boltMode os.FileMode
|
||||
|
||||
// Bolt options
|
||||
boltOptions *bolt.Options
|
||||
|
||||
// The root node that points to the root bucket.
|
||||
root *node
|
||||
|
||||
// The root bucket name
|
||||
rootBucket []string
|
||||
|
||||
// Enable batch mode for read-write transaction, instead of update mode
|
||||
batchMode bool
|
||||
}
|
||||
|
||||
// From returns a new Storm node with a new bucket root.
|
||||
// All DB operations on the new node will be executed relative to the given
|
||||
// bucket.
|
||||
func (s *DB) From(root ...string) Node {
|
||||
newNode := *s.root
|
||||
newNode.rootBucket = root
|
||||
return &newNode
|
||||
}
|
||||
|
||||
// WithTransaction returns a New Storm node that will use the given transaction.
|
||||
func (s *DB) WithTransaction(tx *bolt.Tx) Node {
|
||||
return s.root.WithTransaction(tx)
|
||||
}
|
||||
|
||||
// Bucket returns the root bucket name as a slice.
|
||||
// In the normal, simple case this will be empty.
|
||||
func (s *DB) Bucket() []string {
|
||||
return s.root.Bucket()
|
||||
}
|
||||
|
||||
// Close the database
|
||||
@@ -115,161 +85,6 @@ func (s *DB) Close() error {
|
||||
return s.Bolt.Close()
|
||||
}
|
||||
|
||||
// Codec returns the EncodeDecoder used by this instance of Storm
|
||||
func (s *DB) Codec() codec.MarshalUnmarshaler {
|
||||
return s.codec
|
||||
}
|
||||
|
||||
// WithCodec returns a New Storm Node that will use the given Codec.
|
||||
func (s *DB) WithCodec(codec codec.MarshalUnmarshaler) Node {
|
||||
n := s.From().(*node)
|
||||
n.codec = codec
|
||||
return n
|
||||
}
|
||||
|
||||
// WithBatch returns a new Storm Node with the batch mode enabled.
|
||||
func (s *DB) WithBatch(enabled bool) Node {
|
||||
n := s.From().(*node)
|
||||
n.batchMode = enabled
|
||||
return n
|
||||
}
|
||||
|
||||
// Get a value from a bucket
|
||||
func (s *DB) Get(bucketName string, key interface{}, to interface{}) error {
|
||||
return s.root.Get(bucketName, key, to)
|
||||
}
|
||||
|
||||
// Set a key/value pair into a bucket
|
||||
func (s *DB) Set(bucketName string, key interface{}, value interface{}) error {
|
||||
return s.root.Set(bucketName, key, value)
|
||||
}
|
||||
|
||||
// Delete deletes a key from a bucket
|
||||
func (s *DB) Delete(bucketName string, key interface{}) error {
|
||||
return s.root.Delete(bucketName, key)
|
||||
}
|
||||
|
||||
// GetBytes gets a raw value from a bucket.
|
||||
func (s *DB) GetBytes(bucketName string, key interface{}) ([]byte, error) {
|
||||
return s.root.GetBytes(bucketName, key)
|
||||
}
|
||||
|
||||
// SetBytes sets a raw value into a bucket.
|
||||
func (s *DB) SetBytes(bucketName string, key interface{}, value []byte) error {
|
||||
return s.root.SetBytes(bucketName, key, value)
|
||||
}
|
||||
|
||||
// Save a structure
|
||||
func (s *DB) Save(data interface{}) error {
|
||||
return s.root.Save(data)
|
||||
}
|
||||
|
||||
// PrefixScan scans the root buckets for keys matching the given prefix.
|
||||
func (s *DB) PrefixScan(prefix string) []Node {
|
||||
return s.root.PrefixScan(prefix)
|
||||
}
|
||||
|
||||
// RangeScan scans the root buckets over a range such as a sortable time range.
|
||||
func (s *DB) RangeScan(min, max string) []Node {
|
||||
return s.root.RangeScan(min, max)
|
||||
}
|
||||
|
||||
// Select a list of records that match a list of matchers. Doesn't use indexes.
|
||||
func (s *DB) Select(matchers ...q.Matcher) Query {
|
||||
return s.root.Select(matchers...)
|
||||
}
|
||||
|
||||
// Range returns one or more records by the specified index within the specified range
|
||||
func (s *DB) Range(fieldName string, min, max, to interface{}, options ...func(*index.Options)) error {
|
||||
return s.root.Range(fieldName, min, max, to, options...)
|
||||
}
|
||||
|
||||
// Prefix returns one or more records whose given field starts with the specified prefix.
|
||||
func (s *DB) Prefix(fieldName string, prefix string, to interface{}, options ...func(*index.Options)) error {
|
||||
return s.root.Prefix(fieldName, prefix, to, options...)
|
||||
}
|
||||
|
||||
// AllByIndex gets all the records of a bucket that are indexed in the specified index
|
||||
func (s *DB) AllByIndex(fieldName string, to interface{}, options ...func(*index.Options)) error {
|
||||
return s.root.AllByIndex(fieldName, to, options...)
|
||||
}
|
||||
|
||||
// All get all the records of a bucket
|
||||
func (s *DB) All(to interface{}, options ...func(*index.Options)) error {
|
||||
return s.root.All(to, options...)
|
||||
}
|
||||
|
||||
// Count counts all the records of a bucket
|
||||
func (s *DB) Count(data interface{}) (int, error) {
|
||||
return s.root.Count(data)
|
||||
}
|
||||
|
||||
// DeleteStruct deletes a structure from the associated bucket
|
||||
func (s *DB) DeleteStruct(data interface{}) error {
|
||||
return s.root.DeleteStruct(data)
|
||||
}
|
||||
|
||||
// Drop a bucket
|
||||
func (s *DB) Drop(data interface{}) error {
|
||||
return s.root.Drop(data)
|
||||
}
|
||||
|
||||
// Find returns one or more records by the specified index
|
||||
func (s *DB) Find(fieldName string, value interface{}, to interface{}, options ...func(q *index.Options)) error {
|
||||
return s.root.Find(fieldName, value, to, options...)
|
||||
}
|
||||
|
||||
// Init creates the indexes and buckets for a given structure
|
||||
func (s *DB) Init(data interface{}) error {
|
||||
return s.root.Init(data)
|
||||
}
|
||||
|
||||
// ReIndex rebuilds all the indexes of a bucket
|
||||
func (s *DB) ReIndex(data interface{}) error {
|
||||
return s.root.ReIndex(data)
|
||||
}
|
||||
|
||||
// One returns one record by the specified index
|
||||
func (s *DB) One(fieldName string, value interface{}, to interface{}) error {
|
||||
return s.root.One(fieldName, value, to)
|
||||
}
|
||||
|
||||
// Begin starts a new transaction.
|
||||
func (s *DB) Begin(writable bool) (Node, error) {
|
||||
return s.root.Begin(writable)
|
||||
}
|
||||
|
||||
// Rollback closes the transaction and ignores all previous updates.
|
||||
func (s *DB) Rollback() error {
|
||||
return s.root.Rollback()
|
||||
}
|
||||
|
||||
// Commit writes all changes to disk.
|
||||
func (s *DB) Commit() error {
|
||||
return s.root.Rollback()
|
||||
}
|
||||
|
||||
// Update a structure
|
||||
func (s *DB) Update(data interface{}) error {
|
||||
return s.root.Update(data)
|
||||
}
|
||||
|
||||
// UpdateField updates a single field
|
||||
func (s *DB) UpdateField(data interface{}, fieldName string, value interface{}) error {
|
||||
return s.root.UpdateField(data, fieldName, value)
|
||||
}
|
||||
|
||||
// CreateBucketIfNotExists creates the bucket below the current node if it doesn't
|
||||
// already exist.
|
||||
func (s *DB) CreateBucketIfNotExists(tx *bolt.Tx, bucket string) (*bolt.Bucket, error) {
|
||||
return s.root.CreateBucketIfNotExists(tx, bucket)
|
||||
}
|
||||
|
||||
// GetBucket returns the given bucket below the current node.
|
||||
func (s *DB) GetBucket(tx *bolt.Tx, children ...string) *bolt.Bucket {
|
||||
return s.root.GetBucket(tx, children...)
|
||||
}
|
||||
|
||||
func (s *DB) checkVersion() error {
|
||||
var v string
|
||||
err := s.Get(dbinfo, "version", &v)
|
||||
|
@@ -33,7 +33,6 @@ func TestNewStorm(t *testing.T) {
|
||||
require.Implements(t, (*Node)(nil), db)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, file, db.Path)
|
||||
require.NotNil(t, db.Bolt)
|
||||
require.Equal(t, defaultCodec, db.Codec())
|
||||
|
||||
@@ -50,10 +49,7 @@ func TestNewStormWithStormOptions(t *testing.T) {
|
||||
dc := new(dummyCodec)
|
||||
db1, _ := Open(filepath.Join(dir, "storm1.db"), BoltOptions(0660, &bolt.Options{Timeout: 10 * time.Second}), Codec(dc), Root("a", "b"))
|
||||
require.Equal(t, dc, db1.Codec())
|
||||
require.Equal(t, os.FileMode(0660), db1.boltMode)
|
||||
require.Equal(t, 10*time.Second, db1.boltOptions.Timeout)
|
||||
require.Equal(t, []string{"a", "b"}, db1.rootBucket)
|
||||
require.Equal(t, []string{"a", "b"}, db1.root.rootBucket)
|
||||
require.Equal(t, []string{"a", "b"}, db1.Node.(*node).rootBucket)
|
||||
|
||||
err := db1.Save(&SimpleUser{ID: 1})
|
||||
require.NoError(t, err)
|
||||
@@ -69,7 +65,7 @@ func TestNewStormWithBatch(t *testing.T) {
|
||||
db1, _ := Open(filepath.Join(dir, "storm1.db"), Batch())
|
||||
defer db1.Close()
|
||||
|
||||
require.True(t, db1.root.batchMode)
|
||||
require.True(t, db1.Node.(*node).batchMode)
|
||||
n := db1.From().(*node)
|
||||
require.True(t, n.batchMode)
|
||||
n = db1.WithBatch(true).(*node)
|
||||
@@ -162,7 +158,7 @@ func TestToBytes(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func createDB(t errorHandler, opts ...func(*DB) error) (*DB, func()) {
|
||||
func createDB(t errorHandler, opts ...func(*Options) error) (*DB, func()) {
|
||||
dir, err := ioutil.TempDir(os.TempDir(), "storm")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
|
Reference in New Issue
Block a user