mirror of
https://github.com/timshannon/bolthold.git
synced 2025-12-24 13:37:58 +08:00
296 lines
7.5 KiB
Go
296 lines
7.5 KiB
Go
// Copyright 2016 Tim Shannon. All rights reserved.
|
|
// Use of this source code is governed by the MIT license
|
|
// that can be found in the LICENSE file.
|
|
|
|
package bolthold
|
|
|
|
import (
|
|
"errors"
|
|
"reflect"
|
|
|
|
bolt "go.etcd.io/bbolt"
|
|
)
|
|
|
|
// ErrKeyExists is the error returned when data is being Inserted for a Key that already exists
|
|
var ErrKeyExists = errors.New("This Key already exists in this bolthold for this type")
|
|
|
|
// ErrUniqueExists is the error thrown when data is being inserted for a unique constraint value that already exists
|
|
var ErrUniqueExists = errors.New("This value cannot be written due to the unique constraint on the field")
|
|
|
|
// sequence tells bolthold to insert the key as the next sequence in the bucket
|
|
type sequence struct{}
|
|
|
|
// NextSequence is used to create a sequential key for inserts
|
|
// Inserts a uint64 as the key
|
|
// store.Insert(bolthold.NextSequence(), data)
|
|
func NextSequence() interface{} {
|
|
return sequence{}
|
|
}
|
|
|
|
// Insert inserts the passed in data into the the bolthold
|
|
//
|
|
// If the the key already exists in the bolthold, then an ErrKeyExists is returned
|
|
// If the data struct has a field tagged as `boltholdKey` and it is the same type
|
|
// as the Insert key, AND the data struct is passed by reference, AND the key field
|
|
// is currently set to the zero-value for that type, then that field will be set to
|
|
// the value of the insert key.
|
|
//
|
|
// To use this with bolthold.NextSequence() use a type of `uint64` for the key field.
|
|
func (s *Store) Insert(key, data interface{}) error {
|
|
return s.Bolt().Update(func(tx *bolt.Tx) error {
|
|
return s.insert(tx, key, data)
|
|
})
|
|
}
|
|
|
|
// TxInsert is the same as Insert except it allows you specify your own transaction
|
|
func (s *Store) TxInsert(tx *bolt.Tx, key, data interface{}) error {
|
|
if !tx.Writable() {
|
|
return bolt.ErrTxNotWritable
|
|
}
|
|
return s.insert(tx, key, data)
|
|
}
|
|
|
|
// InsertIntoBucket is the same as Insert except it allows you specify your own parent bucket
|
|
func (s *Store) InsertIntoBucket(parent *bolt.Bucket, key, data interface{}) error {
|
|
if !parent.Tx().Writable() {
|
|
return bolt.ErrTxNotWritable
|
|
}
|
|
return s.insert(parent, key, data)
|
|
}
|
|
|
|
func (s *Store) insert(source BucketSource, key, data interface{}) error {
|
|
storer := s.newStorer(data)
|
|
|
|
b, err := source.CreateBucketIfNotExists([]byte(storer.Type()))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, ok := key.(sequence); ok {
|
|
key, err = b.NextSequence()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
gk, err := s.encode(key)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if b.Get(gk) != nil {
|
|
return ErrKeyExists
|
|
}
|
|
|
|
value, err := s.encode(data)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// insert data
|
|
err = b.Put(gk, value)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// insert any indexes
|
|
err = s.addIndexes(storer, source, gk, data)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
dataVal := reflect.Indirect(reflect.ValueOf(data))
|
|
if !dataVal.CanSet() {
|
|
return nil
|
|
}
|
|
dataType := dataVal.Type()
|
|
|
|
for i := 0; i < dataType.NumField(); i++ {
|
|
tf := dataType.Field(i)
|
|
// XXX: should we require standard tag format so we can use StructTag.Lookup()?
|
|
// XXX: should we use strings.Contains(string(tf.Tag), BoltholdKeyTag) so we don't require proper tags?
|
|
if _, ok := tf.Tag.Lookup(BoltholdKeyTag); ok {
|
|
fieldValue := dataVal.Field(i)
|
|
keyValue := reflect.ValueOf(key)
|
|
if keyValue.Type() != tf.Type {
|
|
break
|
|
}
|
|
if !fieldValue.CanSet() {
|
|
break
|
|
}
|
|
if !reflect.DeepEqual(fieldValue.Interface(), reflect.Zero(tf.Type).Interface()) {
|
|
break
|
|
}
|
|
fieldValue.Set(keyValue)
|
|
break
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Update updates an existing record in the bolthold
|
|
// if the Key doesn't already exist in the store, then it fails with ErrNotFound
|
|
func (s *Store) Update(key interface{}, data interface{}) error {
|
|
return s.Bolt().Update(func(tx *bolt.Tx) error {
|
|
return s.update(tx, key, data)
|
|
})
|
|
}
|
|
|
|
// TxUpdate is the same as Update except it allows you to specify your own transaction
|
|
func (s *Store) TxUpdate(tx *bolt.Tx, key interface{}, data interface{}) error {
|
|
if !tx.Writable() {
|
|
return bolt.ErrTxNotWritable
|
|
}
|
|
return s.update(tx, key, data)
|
|
}
|
|
|
|
// UpdateBucket allows you to run an update against any parent bucket
|
|
func (s *Store) UpdateBucket(parent *bolt.Bucket, key interface{}, data interface{}) error {
|
|
if !parent.Tx().Writable() {
|
|
return bolt.ErrTxNotWritable
|
|
}
|
|
return s.update(parent, key, data)
|
|
|
|
}
|
|
|
|
func (s *Store) update(source BucketSource, key interface{}, data interface{}) error {
|
|
storer := s.newStorer(data)
|
|
|
|
gk, err := s.encode(key)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
b, err := source.CreateBucketIfNotExists([]byte(storer.Type()))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
existing := b.Get(gk)
|
|
|
|
if existing == nil {
|
|
return ErrNotFound
|
|
}
|
|
|
|
// delete any existing indexes
|
|
existingVal := newElemType(data)
|
|
|
|
err = s.decode(existing, existingVal)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = s.deleteIndexes(storer, source, gk, existingVal)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
value, err := s.encode(data)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// put data
|
|
err = b.Put(gk, value)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// insert any new indexes
|
|
return s.addIndexes(storer, source, gk, data)
|
|
}
|
|
|
|
// Upsert inserts the record into the bolthold if it doesn't exist. If it does already exist, then it updates
|
|
// the existing record
|
|
func (s *Store) Upsert(key interface{}, data interface{}) error {
|
|
return s.Bolt().Update(func(tx *bolt.Tx) error {
|
|
return s.upsert(tx, key, data)
|
|
})
|
|
}
|
|
|
|
// TxUpsert is the same as Upsert except it allows you to specify your own transaction
|
|
func (s *Store) TxUpsert(tx *bolt.Tx, key interface{}, data interface{}) error {
|
|
if !tx.Writable() {
|
|
return bolt.ErrTxNotWritable
|
|
}
|
|
return s.upsert(tx, key, data)
|
|
}
|
|
|
|
// UpsertBucket allows you to run an upsert against any bucket parent
|
|
func (s *Store) UpsertBucket(parent *bolt.Bucket, key interface{}, data interface{}) error {
|
|
if !parent.Tx().Writable() {
|
|
return bolt.ErrTxNotWritable
|
|
}
|
|
return s.upsert(parent, key, data)
|
|
}
|
|
|
|
func (s *Store) upsert(source BucketSource, key interface{}, data interface{}) error {
|
|
storer := s.newStorer(data)
|
|
|
|
gk, err := s.encode(key)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
b, err := source.CreateBucketIfNotExists([]byte(storer.Type()))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
existing := b.Get(gk)
|
|
|
|
if existing != nil {
|
|
existingVal := newElemType(data)
|
|
|
|
err = s.decode(existing, existingVal)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = s.deleteIndexes(storer, source, gk, existingVal)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
}
|
|
|
|
value, err := s.encode(data)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// put data
|
|
err = b.Put(gk, value)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// insert any new indexes
|
|
return s.addIndexes(storer, source, gk, data)
|
|
}
|
|
|
|
// UpdateMatching runs the update function for every record that match the passed in query
|
|
// Note that the type of record in the update func always has to be a pointer
|
|
func (s *Store) UpdateMatching(dataType interface{}, query *Query, update func(record interface{}) error) error {
|
|
return s.Bolt().Update(func(tx *bolt.Tx) error {
|
|
return s.updateQuery(tx, dataType, query, update)
|
|
})
|
|
}
|
|
|
|
// TxUpdateMatching does the same as UpdateMatching, but allows you to specify your own transaction
|
|
func (s *Store) TxUpdateMatching(tx *bolt.Tx, dataType interface{}, query *Query,
|
|
update func(record interface{}) error) error {
|
|
return s.updateQuery(tx, dataType, query, update)
|
|
}
|
|
|
|
// UpdateMatchingInBucket does the same as UpdateMatching, but allows you to specify your own parent bucket
|
|
func (s *Store) UpdateMatchingInBucket(parent *bolt.Bucket, dataType interface{}, query *Query,
|
|
update func(record interface{}) error) error {
|
|
return s.updateQuery(parent, dataType, query, update)
|
|
}
|