Implement bolt engine

This commit is contained in:
Asdine El Hrychy
2019-08-02 07:51:15 +02:00
parent de1b36ab84
commit 5f7cde430e
6 changed files with 139 additions and 249 deletions

26
db.go
View File

@@ -15,9 +15,10 @@ import (
)
var (
seed = time.Now().UnixNano()
entropy = rand.New(rand.NewSource(seed))
ulidTs = ulid.Timestamp(time.Now())
seed = time.Now().UnixNano()
entropy = rand.New(rand.NewSource(seed))
ulidTs = ulid.Timestamp(time.Now())
separator byte = 0x1F
)
var (
@@ -181,12 +182,10 @@ func (tx Tx) DropTable(name string) error {
return err
}
const sep = 0x1e
func buildIndexName(tableName, field string) string {
var b strings.Builder
b.WriteString(tableName)
b.WriteByte(sep)
b.WriteByte(separator)
b.WriteString(field)
return b.String()
@@ -195,7 +194,12 @@ func buildIndexName(tableName, field string) string {
// CreateIndex creates an index with the given name.
// If it already exists, returns ErrTableAlreadyExists.
func (tx Tx) CreateIndex(tableName, field string) error {
err := tx.tx.CreateStore(buildIndexName(tableName, field))
_, err := tx.Table(tableName)
if err != nil {
return err
}
err = tx.tx.CreateStore(buildIndexName(tableName, field))
if err == engine.ErrStoreAlreadyExists {
return ErrIndexAlreadyExists
}
@@ -266,7 +270,7 @@ type Table struct {
// Iterate goes through all the records of the table and calls the given function by passing each one of them.
// If the given function returns an error, the iteration stops.
func (t Table) Iterate(fn func(recordID []byte, r record.Record) error) error {
return t.store.AscendGreater(nil, func(recordID, v []byte) error {
return t.store.AscendGreaterOrEqual(nil, func(recordID, v []byte) error {
return fn(recordID, record.EncodedRecord(v))
})
}
@@ -399,7 +403,7 @@ func (t Table) Truncate() error {
// If the field data is empty, it is filled with the zero value of the field type.
// If a record already has the field, no change is performed on that record.
func (t Table) AddField(f field.Field) error {
return t.store.AscendGreater(nil, func(recordID, v []byte) error {
return t.store.AscendGreaterOrEqual(nil, func(recordID, v []byte) error {
var fb record.FieldBuffer
err := fb.ScanRecord(record.EncodedRecord(v))
if err != nil {
@@ -427,7 +431,7 @@ func (t Table) AddField(f field.Field) error {
// DeleteField changes the table structure by deleting a field from all the records.
func (t Table) DeleteField(name string) error {
return t.store.AscendGreater(nil, func(recordID []byte, v []byte) error {
return t.store.AscendGreaterOrEqual(nil, func(recordID []byte, v []byte) error {
var fb record.FieldBuffer
err := fb.ScanRecord(record.EncodedRecord(v))
if err != nil {
@@ -451,7 +455,7 @@ func (t Table) DeleteField(name string) error {
// RenameField changes the table structure by renaming the selected field on all the records.
func (t Table) RenameField(oldName, newName string) error {
return t.store.AscendGreater(nil, func(recordID []byte, v []byte) error {
return t.store.AscendGreaterOrEqual(nil, func(recordID []byte, v []byte) error {
var fb record.FieldBuffer
err := fb.ScanRecord(record.EncodedRecord(v))
if err != nil {

View File

@@ -5,9 +5,6 @@ import (
"os"
"github.com/asdine/genji/engine"
"github.com/asdine/genji/index"
"github.com/asdine/genji/record"
"github.com/asdine/genji/table"
bolt "github.com/etcd-io/bbolt"
)
@@ -72,142 +69,57 @@ func (t *Transaction) Commit() error {
return t.tx.Commit()
}
// Table returns a table by name. The table uses a Bolt bucket.
func (t *Transaction) Table(name string, codec record.Codec) (table.Table, error) {
// Store returns a store by name. The store uses a Bolt bucket.
func (t *Transaction) Store(name string) (engine.Store, error) {
bname := []byte(name)
b := t.tx.Bucket(bname)
if b == nil {
return nil, engine.ErrTableNotFound
return nil, engine.ErrStoreNotFound
}
return &Table{
return &Store{
bucket: b,
codec: codec,
tx: t.tx,
name: bname,
}, nil
}
// CreateTable creates a bolt bucket and returns a table.
// If the table already exists, returns engine.ErrTableAlreadyExists.
func (t *Transaction) CreateTable(name string) error {
// CreateStore creates a bolt bucket and returns a store.
// If the store already exists, returns engine.ErrStoreAlreadyExists.
func (t *Transaction) CreateStore(name string) error {
if !t.writable {
return engine.ErrTransactionReadOnly
}
_, err := t.tx.CreateBucket([]byte(name))
if err == bolt.ErrBucketExists {
return engine.ErrTableAlreadyExists
return engine.ErrStoreAlreadyExists
}
return err
}
// DropTable deletes the underlying bucket.
func (t *Transaction) DropTable(name string) error {
// DropStore deletes the underlying bucket.
func (t *Transaction) DropStore(name string) error {
if !t.writable {
return engine.ErrTransactionReadOnly
}
err := t.tx.DeleteBucket([]byte(name))
if err == bolt.ErrBucketNotFound {
return engine.ErrTableNotFound
return engine.ErrStoreNotFound
}
return err
}
// CreateIndex creates an index in a sub bucket of the table bucket.
func (t *Transaction) CreateIndex(table, fieldName string) error {
if !t.writable {
return engine.ErrTransactionReadOnly
}
b := t.tx.Bucket([]byte(table))
if b == nil {
return engine.ErrTableNotFound
}
bb, err := b.CreateBucketIfNotExists([]byte(indexBucketName))
if err != nil {
return err
}
_, err = bb.CreateBucket([]byte(fieldName))
if err == bolt.ErrBucketExists {
return engine.ErrIndexAlreadyExists
}
return err
}
// Index returns an index by name.
func (t *Transaction) Index(table, fieldName string) (index.Index, error) {
b := t.tx.Bucket([]byte(table))
if b == nil {
return nil, engine.ErrTableNotFound
}
bb := b.Bucket([]byte(indexBucketName))
if bb == nil {
return nil, engine.ErrIndexNotFound
}
ib := bb.Bucket([]byte(fieldName))
if ib == nil {
return nil, engine.ErrIndexNotFound
}
return &Index{
b: ib,
}, nil
}
// Indexes lists all the indexes of this table.
func (t *Transaction) Indexes(table string) (map[string]index.Index, error) {
b := t.tx.Bucket([]byte(table))
if b == nil {
return nil, engine.ErrTableNotFound
}
m := make(map[string]index.Index)
bb := b.Bucket([]byte(indexBucketName))
if bb == nil {
return nil, nil
}
err := bb.ForEach(func(k, _ []byte) error {
m[string(k)] = &Index{
b: bb.Bucket(k),
}
// ListStores returns a list of all the store names.
func (t *Transaction) ListStores(name string) ([]string, error) {
var names []string
err := t.tx.ForEach(func(name []byte, _ *bolt.Bucket) error {
names = append(names, string(name))
return nil
})
return m, err
}
// DropIndex drops an index by name, removing its corresponding bucket.
func (t *Transaction) DropIndex(table, fieldName string) error {
if !t.writable {
return engine.ErrTransactionReadOnly
}
b := t.tx.Bucket([]byte(table))
if b == nil {
return engine.ErrTableNotFound
}
bb := b.Bucket([]byte(indexBucketName))
if bb == nil {
return engine.ErrIndexNotFound
}
err := bb.DeleteBucket([]byte(fieldName))
if err == bolt.ErrBucketNotFound {
return engine.ErrIndexNotFound
}
return err
return names, err
}

104
engine/bolt/store.go Normal file
View File

@@ -0,0 +1,104 @@
package bolt
import (
"bytes"
"github.com/asdine/genji/engine"
"github.com/asdine/genji/table"
bolt "github.com/etcd-io/bbolt"
)
// A Store is an implementation of the engine.Store interface using a bucket.
type Store struct {
bucket *bolt.Bucket
tx *bolt.Tx
name []byte
}
// Put stores a key value pair. If it already exists, it overrides it.
func (s *Store) Put(k, v []byte) error {
if !s.bucket.Writable() {
return engine.ErrTransactionReadOnly
}
return s.bucket.Put(k, v)
}
// Get returns a value associated with the given key. If not found, returns engine.ErrKeyNotFound.
func (s *Store) Get(k []byte) ([]byte, error) {
v := s.bucket.Get(k)
if v == nil {
return nil, engine.ErrKeyNotFound
}
return v, nil
}
// Delete a record by recordID. If not found, returns table.ErrRecordNotFound.
func (s *Store) Delete(k []byte) error {
if !s.bucket.Writable() {
return engine.ErrTransactionReadOnly
}
v := s.bucket.Get(k)
if v == nil {
return table.ErrRecordNotFound
}
return s.bucket.Delete(k)
}
// AscendGreaterOrEqual seeks for the pivot and then goes through all the subsequent key value pairs in increasing order and calls the given function for each pair.
// If the given function returns an error, the iteration stops and returns that error.
// If the pivot is nil, starts from the beginning.
func (s *Store) AscendGreaterOrEqual(pivot []byte, fn func(k, v []byte) error) error {
c := s.bucket.Cursor()
for k, v := c.Seek(pivot); k != nil; k, v = c.Next() {
err := fn(k, v)
if err != nil {
return err
}
}
return nil
}
// DescendLessOrEqual seeks for the pivot and then goes through all the subsequent key value pairs in descreasing order and calls the given function for each pair.
// If the given function returns an error, the iteration stops and returns that error.
// If the pivot is nil, starts from the end.
func (s *Store) DescendLessOrEqual(pivot []byte, fn func(k, v []byte) error) error {
c := s.bucket.Cursor()
k, v := c.Seek(pivot)
if k == nil {
k, v = c.Last()
} else {
for bytes.Compare(k, pivot) > 0 {
k, v = c.Prev()
}
}
for k != nil {
err := fn(k, v)
if err != nil {
return err
}
k, v = c.Prev()
}
return nil
}
// Truncate deletes all the records of the store.
func (s *Store) Truncate() error {
if !s.bucket.Writable() {
return engine.ErrTransactionReadOnly
}
err := s.tx.DeleteBucket(s.name)
if err != nil {
return err
}
_, err = s.tx.CreateBucket(s.name)
return err
}

View File

@@ -1,128 +0,0 @@
package bolt
import (
"github.com/asdine/genji/engine"
"github.com/asdine/genji/field"
"github.com/asdine/genji/record"
"github.com/asdine/genji/table"
bolt "github.com/etcd-io/bbolt"
)
// A Table is represented by a bucket.
// Each record is stored as a key value pair, where the recordID is stored as the key.
// Table uses the codec to encode the record and store is as the value.
type Table struct {
bucket *bolt.Bucket
codec record.Codec
tx *bolt.Tx
name []byte
}
// Insert a record into the table bucket. If the record implements the table.Pker interface,
// it uses the returned value as the recordID. If not, it generates a recordID using NextSequence.
func (t *Table) Insert(r record.Record) (recordID []byte, err error) {
if !t.bucket.Writable() {
return nil, engine.ErrTransactionReadOnly
}
if pker, ok := r.(table.Pker); ok {
recordID, err = pker.Pk()
if err != nil {
return nil, err
}
} else {
seq, err := t.bucket.NextSequence()
if err != nil {
return nil, err
}
recordID = field.EncodeUint64(seq)
}
data, err := t.codec.Encode(r)
if err != nil {
return nil, err
}
err = t.bucket.Put(recordID, data)
if err != nil {
return nil, err
}
return recordID, nil
}
// Record returns a record by recordID. If not found, returns table.ErrRecordNotFound.
func (t *Table) Record(recordID []byte) (record.Record, error) {
v := t.bucket.Get(recordID)
if v == nil {
return nil, table.ErrRecordNotFound
}
return t.codec.Decode(v)
}
// Delete a record by recordID. If not found, returns table.ErrRecordNotFound.
func (t *Table) Delete(recordID []byte) error {
if !t.bucket.Writable() {
return engine.ErrTransactionReadOnly
}
v := t.bucket.Get(recordID)
if v == nil {
return table.ErrRecordNotFound
}
return t.bucket.Delete(recordID)
}
// Iterate through all the records of the table until the end or until fn
// returns an error.
func (t *Table) Iterate(fn func([]byte, record.Record) error) error {
return t.bucket.ForEach(func(k, v []byte) error {
if v == nil {
return nil
}
r, err := t.codec.Decode(v)
if err != nil {
return err
}
return fn(k, r)
})
}
// Replace a record by recordID. If not found, returns table.ErrRecordNotFound.
func (t *Table) Replace(recordID []byte, r record.Record) error {
if !t.bucket.Writable() {
return engine.ErrTransactionReadOnly
}
v := t.bucket.Get(recordID)
if v == nil {
return table.ErrRecordNotFound
}
v, err := t.codec.Encode(r)
if err != nil {
return err
}
return t.bucket.Put(recordID, v)
}
// Truncate deletes all the records of the table.
func (t *Table) Truncate() error {
if !t.bucket.Writable() {
return engine.ErrTransactionReadOnly
}
err := t.tx.DeleteBucket(t.name)
if err != nil {
return err
}
_, err = t.tx.CreateBucket(t.name)
return err
}

View File

@@ -38,7 +38,7 @@ type Transaction interface {
Store(name string) (Store, error)
CreateStore(name string) error
DropStore(name string) error
StoreList(prefix string) ([]string, error)
ListStores(prefix string) ([]string, error)
}
// A Store manages key value pairs. It is an abstraction on top of data structures that can provide random readThe store can be implemented by any data stru
@@ -51,11 +51,11 @@ type Store interface {
Delete(k []byte) error
// Truncate deletes all the key value pairs from the store.
Truncate() error
// AscendGreater seeks for the pivot and then goes through all the subsequent key value pairs in increasing order and calls the given function for each pair.
// AscendGreaterOrEqual seeks for the pivot and then goes through all the subsequent key value pairs in increasing order and calls the given function for each pair.
// If the given function returns an error, the iteration stops and returns that error.
// If the pivot is nil, starts from the beginning.
AscendGreaterOrEqual(pivot []byte, fn func(k, v []byte) error) error
// DescendGreater seeks for the pivot and then goes through all the subsequent key value pairs in descreasing order and calls the given function for each pair.
// DescendLessOrEqual seeks for the pivot and then goes through all the subsequent key value pairs in descreasing order and calls the given function for each pair.
// If the given function returns an error, the iteration stops and returns that error.
// If the pivot is nil, starts from the end.
DescendLessOrEqual(pivot []byte, fn func(k, v []byte) error) error

View File

@@ -21,7 +21,6 @@ type User struct {
func Example() {
ng := memory.NewEngine()
db := genji.New(ng)
defer db.Close()
// open a read-write transaction
err := db.Update(func(tx *genji.Tx) error {
@@ -60,7 +59,6 @@ func Example() {
func ExampleDB() {
ng := memory.NewEngine()
db := genji.New(ng)
defer db.Close()
err := db.Update(func(tx *genji.Tx) error {
err := tx.CreateTable("Table")