This commit is contained in:
Asdine El Hrychy
2024-01-20 14:59:25 +04:00
parent 6d933a39b3
commit 4faae4a6e9
15 changed files with 465 additions and 111 deletions

14
go.mod
View File

@@ -23,7 +23,14 @@ require (
github.com/getsentry/sentry-go v0.25.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/kelindar/bitmap v1.4.1 // indirect
github.com/kelindar/column v0.4.1 // indirect
github.com/kelindar/intmap v1.1.0 // indirect
github.com/kelindar/iostream v1.3.0 // indirect
github.com/kelindar/simd v1.1.2 // indirect
github.com/kelindar/smutex v1.0.0 // indirect
github.com/klauspost/compress v1.17.4 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
@@ -34,6 +41,13 @@ require (
github.com/prometheus/common v0.45.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
<<<<<<< Updated upstream
=======
github.com/tidwall/btree v1.6.0 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
golang.org/x/exp v0.0.0-20231127185646-65229373498e // indirect
>>>>>>> Stashed changes
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect

22
go.sum
View File

@@ -34,10 +34,24 @@ github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/kelindar/bitmap v1.4.1 h1:Ih0BWMYXkkZxPMU536DsQKRhdvqFl7tuNjImfLJWC6E=
github.com/kelindar/bitmap v1.4.1/go.mod h1:4QyD+TDbfgy8oYB9oC4JzqfudYCYIjhbSP7iLraP+28=
github.com/kelindar/column v0.4.1 h1:eonRwgTe3EEFdrBWSNnWyXYiy5/JgMzfNoH8AHkMpng=
github.com/kelindar/column v0.4.1/go.mod h1:sZbenXtC3tNtfTRc3uifH8Z5TOn4kvDXA0yAdmyjnjs=
github.com/kelindar/intmap v1.1.0 h1:S+YEDvw5FQus5UJDEG+xsLp8il3BTYqBMkkuVVZPMH8=
github.com/kelindar/intmap v1.1.0/go.mod h1:tDanawPWq1B0HC+X3W8Z6IKNrJqxjruy6CdyTlf6Nic=
github.com/kelindar/iostream v1.3.0 h1:Bz2qQabipZlF1XCk64bnxsGLete+iHtayGPeWVpbwbo=
github.com/kelindar/iostream v1.3.0/go.mod h1:MkjMuVb6zGdPQVdwLnFRO0xOTOdDvBWTztFmjRDQkXk=
github.com/kelindar/simd v1.1.2 h1:KduKb+M9cMY2HIH8S/cdJyD+5n5EGgq+Aeeleos55To=
github.com/kelindar/simd v1.1.2/go.mod h1:inq4DFudC7W8L5fhxoeZflLRNpWSs0GNx6MlWFvuvr0=
github.com/kelindar/smutex v1.0.0 h1:+LIZYwPz+v3IWPOse764fNaVQGMVxKV6mbD6OWjQV3o=
github.com/kelindar/smutex v1.0.0/go.mod h1:nMbCZeAHWCsY9Kt4JqX7ETd+NJeR6Swy9im+Th+qUZQ=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4=
github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg=
github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
@@ -65,12 +79,19 @@ github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUz
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/tidwall/btree v1.6.0 h1:LDZfKfQIBHGHWSwckhXI0RPSXzlo+KYdjK7FWSqOzzg=
github.com/tidwall/btree v1.6.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
@@ -90,6 +111,7 @@ golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

View File

@@ -286,7 +286,7 @@ func (t *TableConstraint) String() string {
// TableConstraints holds the list of CHECK constraints.
type TableConstraints []*TableConstraint
// ValidateRow checks all the table constraint for the given row.
// ValidateRow checks all the table constraints for the given row.
func (t *TableConstraints) ValidateRow(tx *Transaction, r Row) error {
for _, tc := range *t {
if tc.Check == nil {

View File

@@ -142,6 +142,22 @@ func (t *Table) Put(key *tree.Key, o types.Object) (Row, error) {
}
func (t *Table) IterateOnRange(rng *Range, reverse bool, fn func(key *tree.Key, r Row) error) error {
e := EncodedObject{
fieldConstraints: &t.Info.FieldConstraints,
}
row := BasicRow{
tableName: t.Info.TableName,
obj: &e,
}
return t.IterateRawOnRange(rng, reverse, func(k *tree.Key, enc []byte) error {
row.key = k
e.encoded = enc
return fn(k, &row)
})
}
func (t *Table) IterateRawOnRange(rng *Range, reverse bool, fn func(key *tree.Key, row []byte) error) error {
var paths []object.Path
pk := t.Info.PrimaryKey
@@ -159,19 +175,7 @@ func (t *Table) IterateOnRange(rng *Range, reverse bool, fn func(key *tree.Key,
}
}
e := EncodedObject{
fieldConstraints: &t.Info.FieldConstraints,
}
row := BasicRow{
tableName: t.Info.TableName,
obj: &e,
}
return t.Tree.IterateOnRange(r, reverse, func(k *tree.Key, enc []byte) error {
row.key = k
e.encoded = enc
return fn(k, &row)
})
return t.Tree.IterateOnRange(r, reverse, fn)
}
// GetRow returns one row by key.

View File

@@ -14,27 +14,24 @@ type Param struct {
Name string
// Value is the parameter value.
Value interface{}
Value any
}
// Environment contains information about the context in which
// the expression is evaluated.
type Environment struct {
Params []Param
Vars *object.FieldBuffer
Row database.Row
Bloc Bloc
DB *database.Database
Tx *database.Transaction
baseRow database.BasicRow
Outer *Environment
}
func New(r database.Row, params ...Param) *Environment {
func New(b Bloc, params ...Param) *Environment {
env := Environment{
Params: params,
Row: r,
Bloc: b,
}
return &env
@@ -48,48 +45,20 @@ func (e *Environment) SetOuter(env *Environment) {
e.Outer = env
}
func (e *Environment) Get(path object.Path) (v types.Value, ok bool) {
if e.Vars != nil {
v, err := path.GetValueFromObject(e.Vars)
if err == nil {
return v, true
}
func (e *Environment) GetBloc() (Bloc, bool) {
if e.Bloc != nil {
return e.Bloc, true
}
if e.Outer != nil {
return e.Outer.Get(path)
}
return types.NewNullValue(), false
}
func (e *Environment) Set(path object.Path, v types.Value) {
if e.Vars == nil {
e.Vars = object.NewFieldBuffer()
}
e.Vars.Set(path, v)
}
func (e *Environment) GetRow() (database.Row, bool) {
if e.Row != nil {
return e.Row, true
}
if e.Outer != nil {
return e.Outer.GetRow()
return e.Outer.GetBloc()
}
return nil, false
}
func (e *Environment) SetRow(r database.Row) {
e.Row = r
}
func (e *Environment) SetRowFromObject(o types.Object) {
e.baseRow.ResetWith("", nil, o)
e.Row = &e.baseRow
func (e *Environment) SetBloc(b Bloc) {
e.Bloc = b
}
func (e *Environment) SetParams(params []Param) {
@@ -150,3 +119,9 @@ func (e *Environment) GetDB() *database.Database {
return nil
}
type Bloc interface {
Next() database.Row
Len() int
Close() error
}

View File

@@ -23,11 +23,6 @@ func (p Path) Eval(env *environment.Environment) (types.Value, error) {
}
dp := object.Path(p)
v, ok := env.Get(dp)
if ok {
return v, nil
}
v, err := dp.GetValueFromObject(r.Object())
if errors.Is(err, types.ErrFieldNotFound) {
return NullLiteral, nil

209
internal/stream/bloc.go Normal file
View File

@@ -0,0 +1,209 @@
package stream
import (
"encoding/binary"
"github.com/chaisql/chai/internal/database"
"github.com/chaisql/chai/internal/tree"
"github.com/chaisql/chai/internal/types"
"github.com/kelindar/column"
"github.com/valyala/bytebufferpool"
)
type BytesBloc struct {
schema *database.TableInfo
data *bytebufferpool.ByteBuffer
cursor int
count int
// reused objects
encodedObj database.EncodedObject
row database.BasicRow
buf []byte
}
func NewBytesBloc(schema *database.TableInfo) *BytesBloc {
bb := BytesBloc{
schema: schema,
data: bytebufferpool.Get(),
}
return &bb
}
func (b *BytesBloc) Add(key *tree.Key, record []byte) error {
if key == nil || len(key.Encoded) == 0 {
panic("key is empty or not encoded")
}
// write key length
if cap(b.buf) < binary.MaxVarintLen64 {
b.buf = make([]byte, binary.MaxVarintLen64)
} else {
b.buf = b.buf[:binary.MaxVarintLen64]
}
n := binary.PutUvarint(b.buf, uint64(len(key.Encoded)))
_, err := b.data.Write(b.buf[:n])
if err != nil {
return err
}
// write key
_, err = b.data.Write(key.Encoded)
if err != nil {
return err
}
// write record length
n = binary.PutUvarint(b.buf, uint64(len(record)))
_, err = b.data.Write(b.buf[:n])
if err != nil {
return err
}
// write record
_, err = b.data.Write(record)
if err != nil {
return err
}
b.count++
return nil
}
func (b *BytesBloc) Next() database.Row {
if b.cursor >= b.data.Len() {
return nil
}
data := b.data.Bytes()
// read key length
length, offset := binary.Uvarint(data[b.cursor:])
b.cursor += offset
// read key
key := data[b.cursor : b.cursor+int(length)]
b.cursor += int(length)
// read record length
length, offset = binary.Uvarint(data[b.cursor:])
b.cursor += offset
// read record
record := data[b.cursor : b.cursor+int(length)]
b.cursor += int(length)
b.encodedObj.ResetWith(&b.schema.FieldConstraints, record)
b.cursor += int(length)
b.row.ResetWith(b.schema.TableName, tree.NewEncodedKey(key), &b.encodedObj)
return &b.row
}
func (b *BytesBloc) Reset() {
b.cursor = 0
b.count = 0
b.data.Reset()
}
func (b *BytesBloc) Len() int {
return b.count
}
func (b *BytesBloc) Close() error {
bytebufferpool.Put(b.data)
return nil
}
type RowBloc struct {
rows []database.Row
cursor int
}
func NewRowBloc() *RowBloc {
return &RowBloc{}
}
func (b *RowBloc) Add(r database.Row) {
b.rows = append(b.rows, r)
}
func (b *RowBloc) Next() database.Row {
if b.cursor >= len(b.rows) {
return nil
}
r := b.rows[b.cursor]
b.cursor++
return r
}
func (b *RowBloc) Reset() {
b.cursor = 0
b.rows = b.rows[:0]
}
func (b *RowBloc) Len() int {
return len(b.rows)
}
func (b *RowBloc) Close() error {
return nil
}
type ColumnBlock struct {
columns column.Collection
}
func (c *ColumnBlock) CreateColumn(name string, typ types.ValueType) error {
switch typ {
case types.BooleanValue:
return c.columns.CreateColumn(name, column.ForBool())
case types.IntegerValue:
return c.columns.CreateColumn(name, column.ForInt())
case types.DoubleValue:
return c.columns.CreateColumn(name, column.ForFloat64())
case types.TextValue:
return c.columns.CreateColumn(name, column.ForString())
case types.BlobValue:
return c.columns.CreateColumn(name, column.ForRecord(func() *blobColumn {
return new(blobColumn)
}))
case types.ArrayValue:
return c.columns.CreateColumn(name, column.ForRecord(func() *blobColumn {
return new(blobColumn)
}))
case types.ObjectValue:
}
c.columns.CreateColumn(name)
}
type blobColumn []byte
func (b blobColumn) MarshalBinary() ([]byte, error) {
return b, nil
}
func (l *blobColumn) UnmarshalBinary(b []byte) error {
*l = b
return nil
}
type arrayColumn types.Array
func (b arrayColumn) MarshalBinary() ([]byte, error) {
return b, nil
}
func (l *arrayColumn) UnmarshalBinary(b []byte) error {
*l = b
return nil
}

View File

@@ -4,6 +4,7 @@ import (
"fmt"
"strings"
"github.com/chaisql/chai/internal/database"
"github.com/chaisql/chai/internal/environment"
"github.com/chaisql/chai/internal/expr"
"github.com/chaisql/chai/internal/stream"
@@ -26,6 +27,8 @@ func (op *EmitOperator) Iterate(in *environment.Environment, fn func(out *enviro
var newEnv environment.Environment
newEnv.SetOuter(in)
bloc := stream.NewRowBloc()
for _, e := range op.Exprs {
v, err := e.Eval(in)
if err != nil {
@@ -35,14 +38,10 @@ func (op *EmitOperator) Iterate(in *environment.Environment, fn func(out *enviro
return errors.WithStack(stream.ErrInvalidResult)
}
newEnv.SetRowFromObject(types.As[types.Object](v))
err = fn(&newEnv)
if err != nil {
return err
}
bloc.Add(database.NewBasicRow(types.As[types.Object](v)))
}
return nil
return fn(&newEnv)
}
func (op *EmitOperator) String() string {

View File

@@ -33,14 +33,19 @@ func (op *DeleteOperator) Iterate(in *environment.Environment, f func(out *envir
}
}
r, ok := out.GetRow()
bloc, ok := out.GetBloc()
if !ok {
return errors.New("missing row")
return errors.New("missing bloc")
}
err := table.Delete(r.Key())
if err != nil {
return err
r := bloc.Next()
for r != nil {
err := table.Delete(r.Key())
if err != nil {
return err
}
r = bloc.Next()
}
return f(out)

View File

@@ -25,12 +25,15 @@ func (op *InsertOperator) Iterate(in *environment.Environment, f func(out *envir
var newEnv environment.Environment
var table *database.Table
newBloc := stream.NewRowBloc()
return op.Prev.Iterate(in, func(out *environment.Environment) error {
newEnv.SetOuter(out)
r, ok := out.GetRow()
bloc, ok := out.GetBloc()
if !ok {
return errors.New("missing row")
return errors.New("missing bloc")
}
var err error
@@ -41,12 +44,19 @@ func (op *InsertOperator) Iterate(in *environment.Environment, f func(out *envir
}
}
_, r, err = table.Insert(r.Object())
if err != nil {
return err
r := bloc.Next()
for r != nil {
_, newRow, err := table.Insert(r.Object())
if err != nil {
return err
}
newBloc.Add(newRow)
r = bloc.Next()
}
newEnv.SetRow(r)
newEnv.SetBloc(newBloc)
return f(&newEnv)
})

View File

@@ -25,9 +25,9 @@ func (op *ReplaceOperator) Iterate(in *environment.Environment, f func(out *envi
var table *database.Table
it := func(out *environment.Environment) error {
r, ok := out.GetRow()
bloc, ok := out.GetBloc()
if !ok {
return errors.New("missing row")
return errors.New("missing bloc")
}
if table == nil {
@@ -38,9 +38,14 @@ func (op *ReplaceOperator) Iterate(in *environment.Environment, f func(out *envi
}
}
_, err := table.Replace(r.Key(), r.Object())
if err != nil {
return err
r := bloc.Next()
for r != nil {
_, err := table.Replace(r.Key(), r.Object())
if err != nil {
return err
}
r = bloc.Next()
}
return f(out)

View File

@@ -59,11 +59,29 @@ func (it *ScanOperator) Iterate(in *environment.Environment, fn func(out *enviro
}
}
for _, rng := range ranges {
err = table.IterateOnRange(rng, it.Reverse, func(key *tree.Key, r database.Row) error {
newEnv.SetRow(r)
bloc := stream.NewBytesBloc(table.Info)
defer bloc.Close()
return fn(&newEnv)
newEnv.SetBloc(bloc)
for _, rng := range ranges {
err = table.IterateRawOnRange(rng, it.Reverse, func(key *tree.Key, r []byte) error {
err = bloc.Add(key, r)
if err != nil {
return err
}
if bloc.Len() < 50 {
return nil
}
err = fn(&newEnv)
if err != nil {
return err
}
bloc.Reset()
return nil
})
if errors.Is(err, stream.ErrStreamClosed) {
err = nil
@@ -73,6 +91,13 @@ func (it *ScanOperator) Iterate(in *environment.Environment, fn func(out *enviro
}
}
if bloc.Len() > 0 {
err = fn(&newEnv)
if err != nil {
return err
}
}
return nil
}

View File

@@ -149,18 +149,27 @@ func TestTableScan(t *testing.T) {
var i int
var got testutil.Objs
err := op.Iterate(&env, func(env *environment.Environment) error {
r, ok := env.GetRow()
b, ok := env.GetBloc()
require.True(t, ok)
var fb object.FieldBuffer
err := fb.Copy(r.Object())
assert.NoError(t, err)
r := b.Next()
require.NotNil(t, r)
for r != nil {
fb.Reset()
err := fb.Copy(r.Object())
assert.NoError(t, err)
got = append(got, &fb)
v, err := env.GetParamByName("foo")
assert.NoError(t, err)
require.Equal(t, types.NewIntegerValue(1), v)
i++
r = b.Next()
}
got = append(got, &fb)
v, err := env.GetParamByName("foo")
assert.NoError(t, err)
require.Equal(t, types.NewIntegerValue(1), v)
i++
return nil
})
if test.fails {

View File

@@ -37,34 +37,44 @@ func (op *ValidateOperator) Iterate(in *environment.Environment, fn func(out *en
var newEnv environment.Environment
newBloc := stream.NewBytesBloc(info)
var br database.BasicRow
var eo database.EncodedObject
return op.Prev.Iterate(in, func(out *environment.Environment) error {
buf = buf[:0]
newEnv.SetOuter(out)
row, ok := out.GetRow()
bloc, ok := out.GetBloc()
if !ok {
return errors.New("missing row")
return errors.New("missing bloc")
}
// generate default values, validate and encode row
buf, err = info.EncodeObject(tx, buf, row.Object())
if err != nil {
return err
row := bloc.Next()
for row != nil {
// generate default values, validate and encode row
buf, err = info.EncodeObject(tx, buf, row.Object())
if err != nil {
return err
}
// use the encoded row as the new row
eo.ResetWith(&info.FieldConstraints, buf)
br.ResetWith(row.TableName(), row.Key(), &eo)
// validate CHECK constraints if any
err := info.TableConstraints.ValidateRow(tx, &br)
if err != nil {
return err
}
newBloc.Add(row.Key(), buf)
row = bloc.Next()
}
// use the encoded row as the new row
eo.ResetWith(&info.FieldConstraints, buf)
br.ResetWith(row.TableName(), row.Key(), &eo)
newEnv.SetRow(&br)
// validate CHECK constraints if any
err := info.TableConstraints.ValidateRow(tx, newEnv.Row)
if err != nil {
return err
}
newEnv.SetBloc(newBloc)
return fn(&newEnv)
})

View File

@@ -0,0 +1,72 @@
package stream2
import (
"fmt"
"github.com/chaisql/chai/internal/environment"
"github.com/chaisql/chai/internal/expr"
"github.com/chaisql/chai/internal/object"
"github.com/chaisql/chai/internal/stream"
"github.com/chaisql/chai/internal/types"
"github.com/pkg/errors"
)
type Operator interface {
Next(*environment.Environment) (Bloc, error)
Close() error
}
// A TakeOperator closes the stream after a certain number of values.
type TakeOperator struct {
E expr.Expr
child Operator
}
// Take closes the stream after n values have passed through the operator.
func Take(child Operator, e expr.Expr) *TakeOperator {
return &TakeOperator{E: e}
}
// Iterate implements the Operator interface.
func (op *TakeOperator) Next(in *environment.Environment) (Bloc, error) {
v, err := op.E.Eval(in)
if err != nil {
return nil, err
}
if !v.Type().IsNumber() {
return nil, fmt.Errorf("limit expression must evaluate to a number, got %q", v.Type())
}
v, err = object.CastAsInteger(v)
if err != nil {
return nil, err
}
n := types.As[int64](v)
var count int64
for count < n {
bloc, err := op.child.Next(in)
if err != nil {
return nil, err
}
count += int64(bloc.Len())
}
bloc, err := op.child.Next(in)
return op.Prev.Iterate(in, func(out *environment.Environment) error {
if count < n {
count++
return f(out)
}
return errors.WithStack(stream.ErrStreamClosed)
})
}
func (op *TakeOperator) String() string {
return fmt.Sprintf("rows.Take(%s)", op.E)
}