mirror of
https://github.com/chaisql/chai.git
synced 2025-09-26 19:51:21 +08:00
remove stream.Clone
This commit is contained in:
@@ -5,6 +5,7 @@ import (
|
||||
|
||||
"github.com/chaisql/chai/internal/database"
|
||||
errs "github.com/chaisql/chai/internal/errors"
|
||||
"github.com/chaisql/chai/internal/planner"
|
||||
"github.com/chaisql/chai/internal/stream"
|
||||
"github.com/chaisql/chai/internal/stream/index"
|
||||
"github.com/chaisql/chai/internal/stream/table"
|
||||
@@ -93,11 +94,17 @@ func (stmt *CreateIndexStmt) Run(ctx *Context) (*Result, error) {
|
||||
Pipe(index.Insert(stmt.Info.IndexName)).
|
||||
Pipe(stream.Discard())
|
||||
|
||||
ss := PreparedStreamStmt{
|
||||
Stream: s,
|
||||
st, err := planner.Optimize(s, ctx.Conn.GetTx().Catalog, ctx.Params)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return ss.Run(ctx)
|
||||
return &Result{
|
||||
Result: &StreamStmtResult{
|
||||
Stream: st,
|
||||
Context: ctx,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// CreateSequenceStmt represents a parsed CREATE SEQUENCE statement.
|
||||
|
@@ -23,7 +23,7 @@ type PreparedStreamStmt struct {
|
||||
// Run returns a result containing the stream. The stream will be executed by calling the Iterate method of
|
||||
// the result.
|
||||
func (s *PreparedStreamStmt) Run(ctx *Context) (*Result, error) {
|
||||
st, err := planner.Optimize(s.Stream.Clone(), ctx.Conn.GetTx().Catalog, ctx.Params)
|
||||
st, err := planner.Optimize(s.Stream, ctx.Conn.GetTx().Catalog, ctx.Params)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@@ -18,18 +18,6 @@ func Concat(s ...*Stream) *ConcatOperator {
|
||||
return &ConcatOperator{Streams: s}
|
||||
}
|
||||
|
||||
func (it *ConcatOperator) Clone() Operator {
|
||||
streams := make([]*Stream, len(it.Streams))
|
||||
for i, s := range it.Streams {
|
||||
streams[i] = s.Clone()
|
||||
}
|
||||
|
||||
return &ConcatOperator{
|
||||
BaseOperator: it.BaseOperator.Clone(),
|
||||
Streams: streams,
|
||||
}
|
||||
}
|
||||
|
||||
func (it *ConcatOperator) Columns(env *environment.Environment) ([]string, error) {
|
||||
if len(it.Streams) == 0 {
|
||||
return nil, nil
|
||||
|
@@ -39,15 +39,6 @@ func ValidateOnConflictDoNothing(indexName string) *ValidateOperator {
|
||||
}
|
||||
}
|
||||
|
||||
func (op *ValidateOperator) Clone() stream.Operator {
|
||||
return &ValidateOperator{
|
||||
BaseOperator: op.BaseOperator.Clone(),
|
||||
IndexName: op.IndexName,
|
||||
OnConflict: op.OnConflict.Clone(),
|
||||
OnConflictDoNothing: op.OnConflictDoNothing,
|
||||
}
|
||||
}
|
||||
|
||||
func (op *ValidateOperator) Iterator(in *environment.Environment) (stream.Iterator, error) {
|
||||
tx := in.GetTx()
|
||||
|
||||
@@ -160,10 +151,8 @@ func (it *ValidateIterator) Next() bool {
|
||||
it.br.ResetWith(it.row.TableName(), key, it.row)
|
||||
|
||||
// execute the onConflict stream
|
||||
clone := it.onConflict.Clone()
|
||||
|
||||
stream.InsertBefore(clone.Op, stream.Rows(it.columns, &it.br))
|
||||
newIt, err := clone.Iterator(it.env)
|
||||
stream.InsertBefore(it.onConflict.Op, stream.Rows(it.columns, &it.br))
|
||||
newIt, err := it.onConflict.Iterator(it.env)
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return false
|
||||
|
@@ -29,7 +29,6 @@ type Operator interface {
|
||||
GetNext() Operator
|
||||
GetPrev() Operator
|
||||
String() string
|
||||
Clone() Operator
|
||||
Columns(env *environment.Environment) ([]string, error)
|
||||
}
|
||||
|
||||
|
@@ -115,25 +115,6 @@ func (s *Stream) String() string {
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
func (s *Stream) Clone() *Stream {
|
||||
if s == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if s.Op == nil {
|
||||
return New(nil)
|
||||
}
|
||||
|
||||
op := s.First()
|
||||
var ops []Operator
|
||||
for op != nil {
|
||||
ops = append(ops, op.Clone())
|
||||
op = op.GetNext()
|
||||
}
|
||||
|
||||
return New(Pipe(ops...))
|
||||
}
|
||||
|
||||
func InsertBefore(op, newOp Operator) Operator {
|
||||
if op != nil {
|
||||
prev := op.GetPrev()
|
||||
|
@@ -39,15 +39,6 @@ func GenerateKeyOnConflictDoNothing(tableName string) *GenerateKeyOperator {
|
||||
}
|
||||
}
|
||||
|
||||
func (op *GenerateKeyOperator) Clone() stream.Operator {
|
||||
return &GenerateKeyOperator{
|
||||
BaseOperator: op.BaseOperator.Clone(),
|
||||
TableName: op.TableName,
|
||||
OnConflict: op.OnConflict.Clone(),
|
||||
OnConflictDoNothing: op.OnConflictDoNothing,
|
||||
}
|
||||
}
|
||||
|
||||
func (op *GenerateKeyOperator) Iterator(in *environment.Environment) (stream.Iterator, error) {
|
||||
tx := in.GetTx()
|
||||
|
||||
@@ -186,10 +177,9 @@ func (it *GenerateKeyIterator) generateKey(r database.Row) (*tree.Key, error) {
|
||||
it.br.ResetWith(it.tableName, k, r)
|
||||
|
||||
// execute the onConflict stream
|
||||
clone := it.onConflict.Clone()
|
||||
stream.InsertBefore(clone.Op, stream.Rows(it.columns, &it.br))
|
||||
stream.InsertBefore(it.onConflict.Op, stream.Rows(it.columns, &it.br))
|
||||
|
||||
newIt, err := clone.Iterator(it.env)
|
||||
newIt, err := it.onConflict.Iterator(it.env)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@@ -22,18 +22,6 @@ func Union(s ...*Stream) *UnionOperator {
|
||||
return &UnionOperator{Streams: s}
|
||||
}
|
||||
|
||||
func (it *UnionOperator) Clone() Operator {
|
||||
streams := make([]*Stream, len(it.Streams))
|
||||
for i, s := range it.Streams {
|
||||
streams[i] = s.Clone()
|
||||
}
|
||||
|
||||
return &UnionOperator{
|
||||
BaseOperator: it.BaseOperator.Clone(),
|
||||
Streams: streams,
|
||||
}
|
||||
}
|
||||
|
||||
func (it *UnionOperator) Columns(env *environment.Environment) ([]string, error) {
|
||||
if len(it.Streams) == 0 {
|
||||
return nil, nil
|
||||
|
Reference in New Issue
Block a user