Files
bolthold/put.go
2021-07-14 11:27:21 -05:00

296 lines
7.5 KiB
Go

// Copyright 2016 Tim Shannon. All rights reserved.
// Use of this source code is governed by the MIT license
// that can be found in the LICENSE file.
package bolthold
import (
"errors"
"reflect"
bolt "go.etcd.io/bbolt"
)
// ErrKeyExists is the error returned when data is being Inserted for a Key that already exists
var ErrKeyExists = errors.New("This Key already exists in this bolthold for this type")
// ErrUniqueExists is the error thrown when data is being inserted for a unique constraint value that already exists
var ErrUniqueExists = errors.New("This value cannot be written due to the unique constraint on the field")
// sequence tells bolthold to insert the key as the next sequence in the bucket
type sequence struct{}
// NextSequence is used to create a sequential key for inserts
// Inserts a uint64 as the key
// store.Insert(bolthold.NextSequence(), data)
func NextSequence() interface{} {
return sequence{}
}
// Insert inserts the passed in data into the the bolthold
//
// If the the key already exists in the bolthold, then an ErrKeyExists is returned
// If the data struct has a field tagged as `boltholdKey` and it is the same type
// as the Insert key, AND the data struct is passed by reference, AND the key field
// is currently set to the zero-value for that type, then that field will be set to
// the value of the insert key.
//
// To use this with bolthold.NextSequence() use a type of `uint64` for the key field.
func (s *Store) Insert(key, data interface{}) error {
return s.Bolt().Update(func(tx *bolt.Tx) error {
return s.insert(tx, key, data)
})
}
// TxInsert is the same as Insert except it allows you specify your own transaction
func (s *Store) TxInsert(tx *bolt.Tx, key, data interface{}) error {
if !tx.Writable() {
return bolt.ErrTxNotWritable
}
return s.insert(tx, key, data)
}
// InsertIntoBucket is the same as Insert except it allows you specify your own parent bucket
func (s *Store) InsertIntoBucket(parent *bolt.Bucket, key, data interface{}) error {
if !parent.Tx().Writable() {
return bolt.ErrTxNotWritable
}
return s.insert(parent, key, data)
}
func (s *Store) insert(source BucketSource, key, data interface{}) error {
storer := s.newStorer(data)
b, err := source.CreateBucketIfNotExists([]byte(storer.Type()))
if err != nil {
return err
}
if _, ok := key.(sequence); ok {
key, err = b.NextSequence()
if err != nil {
return err
}
}
gk, err := s.encode(key)
if err != nil {
return err
}
if b.Get(gk) != nil {
return ErrKeyExists
}
value, err := s.encode(data)
if err != nil {
return err
}
// insert data
err = b.Put(gk, value)
if err != nil {
return err
}
// insert any indexes
err = s.addIndexes(storer, source, gk, data)
if err != nil {
return err
}
dataVal := reflect.Indirect(reflect.ValueOf(data))
if !dataVal.CanSet() {
return nil
}
dataType := dataVal.Type()
for i := 0; i < dataType.NumField(); i++ {
tf := dataType.Field(i)
// XXX: should we require standard tag format so we can use StructTag.Lookup()?
// XXX: should we use strings.Contains(string(tf.Tag), BoltholdKeyTag) so we don't require proper tags?
if _, ok := tf.Tag.Lookup(BoltholdKeyTag); ok {
fieldValue := dataVal.Field(i)
keyValue := reflect.ValueOf(key)
if keyValue.Type() != tf.Type {
break
}
if !fieldValue.CanSet() {
break
}
if !reflect.DeepEqual(fieldValue.Interface(), reflect.Zero(tf.Type).Interface()) {
break
}
fieldValue.Set(keyValue)
break
}
}
return nil
}
// Update updates an existing record in the bolthold
// if the Key doesn't already exist in the store, then it fails with ErrNotFound
func (s *Store) Update(key interface{}, data interface{}) error {
return s.Bolt().Update(func(tx *bolt.Tx) error {
return s.update(tx, key, data)
})
}
// TxUpdate is the same as Update except it allows you to specify your own transaction
func (s *Store) TxUpdate(tx *bolt.Tx, key interface{}, data interface{}) error {
if !tx.Writable() {
return bolt.ErrTxNotWritable
}
return s.update(tx, key, data)
}
// UpdateBucket allows you to run an update against any parent bucket
func (s *Store) UpdateBucket(parent *bolt.Bucket, key interface{}, data interface{}) error {
if !parent.Tx().Writable() {
return bolt.ErrTxNotWritable
}
return s.update(parent, key, data)
}
func (s *Store) update(source BucketSource, key interface{}, data interface{}) error {
storer := s.newStorer(data)
gk, err := s.encode(key)
if err != nil {
return err
}
b, err := source.CreateBucketIfNotExists([]byte(storer.Type()))
if err != nil {
return err
}
existing := b.Get(gk)
if existing == nil {
return ErrNotFound
}
// delete any existing indexes
existingVal := newElemType(data)
err = s.decode(existing, existingVal)
if err != nil {
return err
}
err = s.deleteIndexes(storer, source, gk, existingVal)
if err != nil {
return err
}
value, err := s.encode(data)
if err != nil {
return err
}
// put data
err = b.Put(gk, value)
if err != nil {
return err
}
// insert any new indexes
return s.addIndexes(storer, source, gk, data)
}
// Upsert inserts the record into the bolthold if it doesn't exist. If it does already exist, then it updates
// the existing record
func (s *Store) Upsert(key interface{}, data interface{}) error {
return s.Bolt().Update(func(tx *bolt.Tx) error {
return s.upsert(tx, key, data)
})
}
// TxUpsert is the same as Upsert except it allows you to specify your own transaction
func (s *Store) TxUpsert(tx *bolt.Tx, key interface{}, data interface{}) error {
if !tx.Writable() {
return bolt.ErrTxNotWritable
}
return s.upsert(tx, key, data)
}
// UpsertBucket allows you to run an upsert against any bucket parent
func (s *Store) UpsertBucket(parent *bolt.Bucket, key interface{}, data interface{}) error {
if !parent.Tx().Writable() {
return bolt.ErrTxNotWritable
}
return s.upsert(parent, key, data)
}
func (s *Store) upsert(source BucketSource, key interface{}, data interface{}) error {
storer := s.newStorer(data)
gk, err := s.encode(key)
if err != nil {
return err
}
b, err := source.CreateBucketIfNotExists([]byte(storer.Type()))
if err != nil {
return err
}
existing := b.Get(gk)
if existing != nil {
existingVal := newElemType(data)
err = s.decode(existing, existingVal)
if err != nil {
return err
}
err = s.deleteIndexes(storer, source, gk, existingVal)
if err != nil {
return err
}
}
value, err := s.encode(data)
if err != nil {
return err
}
// put data
err = b.Put(gk, value)
if err != nil {
return err
}
// insert any new indexes
return s.addIndexes(storer, source, gk, data)
}
// UpdateMatching runs the update function for every record that match the passed in query
// Note that the type of record in the update func always has to be a pointer
func (s *Store) UpdateMatching(dataType interface{}, query *Query, update func(record interface{}) error) error {
return s.Bolt().Update(func(tx *bolt.Tx) error {
return s.updateQuery(tx, dataType, query, update)
})
}
// TxUpdateMatching does the same as UpdateMatching, but allows you to specify your own transaction
func (s *Store) TxUpdateMatching(tx *bolt.Tx, dataType interface{}, query *Query,
update func(record interface{}) error) error {
return s.updateQuery(tx, dataType, query, update)
}
// UpdateMatchingInBucket does the same as UpdateMatching, but allows you to specify your own parent bucket
func (s *Store) UpdateMatchingInBucket(parent *bolt.Bucket, dataType interface{}, query *Query,
update func(record interface{}) error) error {
return s.updateQuery(parent, dataType, query, update)
}