index iterator

This commit is contained in:
Asdine El Hrychy
2024-10-06 14:46:57 +04:00
parent 8bf8d5179e
commit 3ece66c2b6
6 changed files with 138 additions and 103 deletions

View File

@@ -85,7 +85,9 @@ func (idx *Index) Exists(vs []types.Value) (bool, *tree.Key, error) {
}
defer it.Close()
for it.First(); it.Valid(); it.Next() {
it.First()
if it.Valid() {
k, err := it.Key().Decode()
if err != nil {
return false, nil, err
@@ -93,12 +95,20 @@ func (idx *Index) Exists(vs []types.Value) (bool, *tree.Key, error) {
dKey = tree.NewEncodedKey(types.AsByteSlice(k[len(k)-1]))
found = true
break
}
return found, dKey, it.Error()
}
func (idx *Index) Iterator(rng *tree.Range) (*IndexIterator, error) {
it, err := idx.Tree.Iterator(rng)
if err != nil {
return nil, err
}
return &IndexIterator{it}, nil
}
// Delete all the references to the key from the index.
func (idx *Index) Delete(vs []types.Value, key []byte) error {
vk := tree.NewKey(vs...)
@@ -107,73 +117,30 @@ func (idx *Index) Delete(vs []types.Value, key []byte) error {
Max: vk,
}
err := idx.iterateOnRange(&rng, false, func(itmKey *tree.Key, pk *tree.Key) error {
if bytes.Equal(pk.Encoded, key) {
err := idx.Tree.Delete(itmKey)
if err == nil {
err = errStop
}
it, err := idx.Iterator(&rng)
if err != nil {
return err
}
defer it.Close()
for it.First(); it.Valid(); it.Next() {
pk, err := it.Value()
if err != nil {
return err
}
return nil
})
if errors.Is(err, errStop) {
return nil
if bytes.Equal(pk.Encoded, key) {
return idx.Tree.Delete(it.Key())
}
}
if err != nil {
if err := it.Error(); err != nil {
return err
}
return errors.WithStack(engine.ErrKeyNotFound)
}
func (idx *Index) IterateOnRange(rng *tree.Range, reverse bool, fn func(key *tree.Key) error) error {
return idx.iterateOnRange(rng, reverse, func(itmKey, key *tree.Key) error {
return fn(key)
})
}
func (idx *Index) iterateOnRange(rng *tree.Range, reverse bool, fn func(itmKey *tree.Key, key *tree.Key) error) error {
it, err := idx.Tree.Iterator(rng)
if err != nil {
return err
}
defer it.Close()
if !reverse {
it.First()
} else {
it.Last()
}
for it.Valid() {
k := it.Key()
// we don't care about the value, we just want to extract the key
// which is the last element of the encoded array
values, err := k.Decode()
if err != nil {
return err
}
pk := tree.NewEncodedKey(types.AsByteSlice(values[len(values)-1]))
err = fn(k, pk)
if err != nil {
return err
}
if !reverse {
it.Next()
} else {
it.Prev()
}
}
return it.Error()
}
// Truncate deletes all the index data.
func (idx *Index) Truncate() error {
return idx.Tree.Truncate()

View File

@@ -9,7 +9,6 @@ import (
"github.com/chaisql/chai/internal/testutil"
"github.com/chaisql/chai/internal/tree"
"github.com/chaisql/chai/internal/types"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
@@ -88,19 +87,26 @@ func TestIndexDelete(t *testing.T) {
pivot := values(types.NewIntegerValue(10))
i := 0
err := idx.IterateOnRange(&tree.Range{Min: testutil.NewKey(t, pivot...)}, false, func(key *tree.Key) error {
it, err := idx.Iterator(&tree.Range{Min: testutil.NewKey(t, pivot...)})
require.NoError(t, err)
defer it.Close()
for it.First(); it.Valid(); it.Next() {
k, err := it.Value()
require.NoError(t, err)
if i == 0 {
require.Equal(t, "other-key", string(key.Encoded))
require.Equal(t, "other-key", string(k.Encoded))
} else if i == 1 {
require.Equal(t, "yet-another-key", string(key.Encoded))
require.Equal(t, "yet-another-key", string(k.Encoded))
} else {
return errors.New("should not reach this point")
t.Fatalf("should not reach this point")
}
i++
return nil
})
require.NoError(t, err)
}
require.NoError(t, it.Error())
require.Equal(t, 2, i)
})
@@ -115,19 +121,26 @@ func TestIndexDelete(t *testing.T) {
pivot := values(types.NewIntegerValue(10))
i := 0
err := idx.IterateOnRange(&tree.Range{Min: testutil.NewKey(t, pivot...)}, false, func(key *tree.Key) error {
it, err := idx.Iterator(&tree.Range{Min: testutil.NewKey(t, pivot...)})
require.NoError(t, err)
defer it.Close()
for it.First(); it.Valid(); it.Next() {
k, err := it.Value()
require.NoError(t, err)
if i == 0 {
require.Equal(t, "other-key", string(key.Encoded))
require.Equal(t, "other-key", string(k.Encoded))
} else if i == 1 {
require.Equal(t, "yet-another-key", string(key.Encoded))
require.Equal(t, "yet-another-key", string(k.Encoded))
} else {
return errors.New("should not reach this point")
t.Fatal("should not reach this point")
}
i++
return nil
})
require.NoError(t, err)
}
require.NoError(t, it.Error())
require.Equal(t, 2, i)
})
@@ -187,9 +200,12 @@ func BenchmarkIndexIteration(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = idx.IterateOnRange(&tree.Range{Min: testutil.NewKey(b, types.NewTextValue(""))}, false, func(_ *tree.Key) error {
return nil
})
it, _ := idx.Iterator(&tree.Range{Min: testutil.NewKey(b, types.NewTextValue(""))})
for it.First(); it.Valid(); it.Next() {
}
it.Close()
}
b.StopTimer()
})
@@ -229,9 +245,12 @@ func BenchmarkCompositeIndexIteration(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = idx.IterateOnRange(&tree.Range{Min: testutil.NewKey(b, types.NewTextValue(""), types.NewTextValue(""))}, false, func(_ *tree.Key) error {
return nil
})
it, _ := idx.Iterator(&tree.Range{Min: testutil.NewKey(b, types.NewTextValue(""))})
for it.First(); it.Valid(); it.Next() {
}
it.Close()
}
b.StopTimer()
})

View File

@@ -75,14 +75,14 @@ func (r *Range) IsEqual(other *Range) bool {
return true
}
type Iterator struct {
type TableIterator struct {
*tree.Iterator
e EncodedRow
row BasicRow
}
func newIterator(ti *tree.Iterator, tableName string, columnConstraints *ColumnConstraints) *Iterator {
it := Iterator{
func newIterator(ti *tree.Iterator, tableName string, columnConstraints *ColumnConstraints) *TableIterator {
it := TableIterator{
Iterator: ti,
}
@@ -93,7 +93,7 @@ func newIterator(ti *tree.Iterator, tableName string, columnConstraints *ColumnC
return &it
}
func (it *Iterator) Value() (Row, error) {
func (it *TableIterator) Value() (Row, error) {
var err error
it.row.key = it.Iterator.Key()
@@ -104,3 +104,19 @@ func (it *Iterator) Value() (Row, error) {
return &it.row, nil
}
type IndexIterator struct {
*tree.Iterator
}
func (it *IndexIterator) Value() (*tree.Key, error) {
k := it.Iterator.Key()
// we don't care about the value, we just want to extract the key
// which is the last element of the encoded array
values, err := k.Decode()
if err != nil {
return nil, err
}
return tree.NewEncodedKey(types.AsByteSlice(values[len(values)-1])), nil
}

View File

@@ -141,7 +141,7 @@ func (t *Table) Put(key *tree.Key, r row.Row) (Row, error) {
}, err
}
func (t *Table) Iterator(rng *Range) (*Iterator, error) {
func (t *Table) Iterator(rng *Range) (*TableIterator, error) {
var columns []string
pk := t.Info.PrimaryKey

View File

@@ -72,11 +72,7 @@ func (it *ScanOperator) Iterate(in *environment.Environment, fn func(out *enviro
newEnv.SetRow(&ptr)
if len(it.Ranges) == 0 {
return index.IterateOnRange(nil, it.Reverse, func(key *tree.Key) error {
ptr.ResetWith(table, key)
return fn(&newEnv)
})
return it.iterateOverRange(table, index, info, nil, &newEnv, &ptr, fn)
}
ranges, err := it.Ranges.Eval(in)
@@ -85,19 +81,7 @@ func (it *ScanOperator) Iterate(in *environment.Environment, fn func(out *enviro
}
for _, rng := range ranges {
r, err := rng.ToTreeRange(&table.Info.ColumnConstraints, info.Columns)
if err != nil {
return err
}
err = index.IterateOnRange(r, it.Reverse, func(key *tree.Key) error {
ptr.ResetWith(table, key)
return fn(&newEnv)
})
if errors.Is(err, stream.ErrStreamClosed) {
err = nil
}
err = it.iterateOverRange(table, index, info, rng, &newEnv, &ptr, fn)
if err != nil {
return err
}
@@ -106,6 +90,55 @@ func (it *ScanOperator) Iterate(in *environment.Environment, fn func(out *enviro
return nil
}
func (op *ScanOperator) iterateOverRange(table *database.Table, index *database.Index, info *database.IndexInfo, rng *database.Range, to *environment.Environment, ptr *database.LazyRow, fn func(out *environment.Environment) error) error {
var r *tree.Range
var err error
if rng != nil {
r, err = rng.ToTreeRange(&table.Info.ColumnConstraints, info.Columns)
if err != nil {
return err
}
}
it, err := index.Iterator(r)
if err != nil {
return err
}
defer it.Close()
if !op.Reverse {
it.First()
} else {
it.Last()
}
for it.Valid() {
key, err := it.Value()
if err != nil {
return err
}
ptr.ResetWith(table, key)
err = fn(to)
if errors.Is(err, stream.ErrStreamClosed) {
break
}
if err != nil {
return err
}
if !op.Reverse {
it.Next()
} else {
it.Prev()
}
}
return it.Error()
}
func (it *ScanOperator) Columns(env *environment.Environment) ([]string, error) {
tx := env.GetTx()

View File

@@ -59,7 +59,7 @@ func (op *ScanOperator) Iterate(in *environment.Environment, fn func(out *enviro
var ranges []*database.Range
if op.Ranges == nil {
if len(op.Ranges) == 0 {
ranges = []*database.Range{nil}
} else {
ranges, err = op.Ranges.Eval(in)