diff --git a/finder.go b/finder.go index 933b568..8a88fe6 100644 --- a/finder.go +++ b/finder.go @@ -58,6 +58,7 @@ func (n *node) One(fieldName string, value interface{}, to interface{}) error { field, ok := cfg.Fields[fieldName] if !ok || (!field.IsID && field.Index == "") { query := newQuery(n, q.StrictEq(fieldName, value)) + query.Limit(1) if n.tx != nil { err = query.query(n.tx, sink) @@ -141,9 +142,8 @@ func (n *node) Find(fieldName string, value interface{}, to interface{}, options field, ok := cfg.Fields[fieldName] if !ok || (!field.IsID && (field.Index == "" || value == nil)) { - sink.limit = opts.Limit - sink.skip = opts.Skip query := newQuery(n, q.Eq(fieldName, value)) + query.Skip(opts.Skip).Limit(opts.Limit) if opts.Reverse { query.Reverse() @@ -190,14 +190,14 @@ func (n *node) find(tx *bolt.Tx, bucketName, fieldName string, cfg *structConfig sink.results = reflect.MakeSlice(reflect.Indirect(sink.ref).Type(), len(list), len(list)) - sorter := newSorter(n, sink, nil, nil, false) + sorter := newSorter(n, sink) for i := range list { raw := bucket.Get(list[i]) if raw == nil { return ErrNotFound } - if _, err := sorter.filter(bucket, list[i], raw); err != nil { + if _, err := sorter.filter(nil, bucket, list[i], raw); err != nil { return err } } @@ -337,9 +337,8 @@ func (n *node) Range(fieldName string, min, max, to interface{}, options ...func field, ok := cfg.Fields[fieldName] if !ok || (!field.IsID && field.Index == "") { - sink.limit = opts.Limit - sink.skip = opts.Skip query := newQuery(n, q.And(q.Gte(fieldName, min), q.Lte(fieldName, max))) + query.Skip(opts.Skip).Limit(opts.Limit) if opts.Reverse { query.Reverse() @@ -389,14 +388,14 @@ func (n *node) rnge(tx *bolt.Tx, bucketName, fieldName string, cfg *structConfig } sink.results = reflect.MakeSlice(reflect.Indirect(sink.ref).Type(), len(list), len(list)) - sorter := newSorter(n, sink, nil, nil, false) + sorter := newSorter(n, sink) for i := range list { raw := bucket.Get(list[i]) if raw == nil { return ErrNotFound } - if _, err := sorter.filter(bucket, list[i], raw); err != nil { + if _, err := sorter.filter(nil, bucket, list[i], raw); err != nil { return err } } diff --git a/query.go b/query.go index b52db37..0f2b52b 100644 --- a/query.go +++ b/query.go @@ -101,8 +101,6 @@ func (q *query) Find(to interface{}) error { return err } - sink.limit = q.limit - sink.skip = q.skip return q.runQuery(sink) } @@ -112,7 +110,7 @@ func (q *query) First(to interface{}) error { return err } - sink.skip = q.skip + q.limit = 1 return q.runQuery(sink) } @@ -122,9 +120,6 @@ func (q *query) Delete(kind interface{}) error { return err } - sink.limit = q.limit - sink.skip = q.skip - return q.runQuery(sink) } @@ -134,9 +129,6 @@ func (q *query) Count(kind interface{}) (int, error) { return 0, err } - sink.limit = q.limit - sink.skip = q.skip - err = q.runQuery(sink) if err != nil { return 0, err @@ -148,9 +140,6 @@ func (q *query) Count(kind interface{}) (int, error) { func (q *query) Raw() ([][]byte, error) { sink := newRawSink() - sink.limit = q.limit - sink.skip = q.skip - err := q.runQuery(sink) if err != nil { return nil, err @@ -162,8 +151,6 @@ func (q *query) Raw() ([][]byte, error) { func (q *query) RawEach(fn func([]byte, []byte) error) error { sink := newRawSink() - sink.limit = q.limit - sink.skip = q.skip sink.execFn = fn return q.runQuery(sink) @@ -175,8 +162,6 @@ func (q *query) Each(kind interface{}, fn func(interface{}) error) error { return err } - sink.limit = q.limit - sink.skip = q.skip sink.execFn = fn return q.runQuery(sink) @@ -207,7 +192,11 @@ func (q *query) query(tx *bolt.Tx, sink sink) error { return sink.flush() } - sorter := newSorter(q.node, sink, q.tree, q.orderBy, q.reverse) + sorter := newSorter(q.node, sink) + sorter.orderBy = q.orderBy + sorter.reverse = q.reverse + sorter.skip = q.skip + sorter.limit = q.limit if bucket != nil { c := internal.Cursor{C: bucket.Cursor(), Reverse: q.reverse} for k, v := c.First(); k != nil; k, v = c.Next() { @@ -215,7 +204,7 @@ func (q *query) query(tx *bolt.Tx, sink sink) error { continue } - stop, err := sorter.filter(bucket, k, v) + stop, err := sorter.filter(q.tree, bucket, k, v) if err != nil { return err } diff --git a/sink.go b/sink.go index b0dbdca..0387b38 100644 --- a/sink.go +++ b/sink.go @@ -16,58 +16,49 @@ type item struct { v []byte } -func newSorter(n Node, snk sink, tree q.Matcher, orderBy []string, reverse bool) *sorter { +func newSorter(n Node, snk sink) *sorter { return &sorter{ - node: n, - sink: snk, - tree: tree, - orderBy: orderBy, - reverse: reverse, - list: make([]*item, 0), - err: make(chan error), - done: make(chan struct{}), + node: n, + sink: snk, + skip: 0, + limit: -1, + list: make([]*item, 0), + err: make(chan error), + done: make(chan struct{}), } } type sorter struct { node Node sink sink - tree q.Matcher list []*item + skip int + limit int orderBy []string reverse bool err chan error done chan struct{} } -func (s *sorter) filter(bucket *bolt.Bucket, k, v []byte) (bool, error) { +func (s *sorter) filter(tree q.Matcher, bucket *bolt.Bucket, k, v []byte) (bool, error) { + itm := &item{ + bucket: bucket, + k: k, + v: v, + } rsink, ok := s.sink.(reflectSink) if !ok { - return s.sink.add(&item{ - bucket: bucket, - k: k, - v: v, - }) + return s.add(itm) } newElem := rsink.elem() if err := s.node.Codec().Unmarshal(v, newElem.Interface()); err != nil { return false, err } + itm.value = &newElem - itm := &item{ - bucket: bucket, - value: &newElem, - k: k, - v: v, - } - - if s.tree == nil { - if len(s.orderBy) == 0 { - return s.sink.add(itm) - } - } else { - ok, err := s.tree.Match(newElem.Interface()) + if tree != nil { + ok, err := tree.Match(newElem.Interface()) if err != nil { return false, err } @@ -77,7 +68,12 @@ func (s *sorter) filter(bucket *bolt.Bucket, k, v []byte) (bool, error) { } if len(s.orderBy) == 0 { - return s.sink.add(itm) + return s.add(itm) + } + + if _, ok := s.sink.(sliceSink); ok { + // add directly to sink, we'll apply skip/limits after sorting + return false, s.sink.add(itm) } s.list = append(s.list, itm) @@ -85,6 +81,25 @@ func (s *sorter) filter(bucket *bolt.Bucket, k, v []byte) (bool, error) { return false, nil } +func (s *sorter) add(itm *item) (stop bool, err error) { + if s.limit == 0 { + return true, nil + } + + if s.skip > 0 { + s.skip-- + return false, nil + } + + if s.limit > 0 { + s.limit-- + } + + err = s.sink.add(itm) + + return s.limit == 0, err +} + func (s *sorter) compareValue(left reflect.Value, right reflect.Value) int { if !left.IsValid() || !right.IsValid() { if left.IsValid() { @@ -195,11 +210,31 @@ func (s *sorter) flush() error { return err } + if ssink, ok := s.sink.(sliceSink); ok { + if !ssink.slice().IsValid() { + return s.sink.flush() + } + skip := s.skip + if s.skip >= ssink.slice().Len() { + ssink.reset() + return s.sink.flush() + } + if skip < 0 { + skip = 0 + } + limit := s.limit + if skip+limit > ssink.slice().Len() || limit < 1 { + limit = ssink.slice().Len() + } + ssink.setSlice(ssink.slice().Slice(skip, limit)) + return s.sink.flush() + } + for _, itm := range s.list { if itm == nil { break } - stop, err := s.sink.add(itm) + stop, err := s.add(itm) if err != nil { return err } @@ -217,18 +252,12 @@ func (s *sorter) Len() int { case <-s.done: return 0 default: - return len(s.list) } -} + if ssink, ok := s.sink.(sliceSink); ok { + return ssink.slice().Len() + } + return len(s.list) -func (s *sorter) Swap(i, j int) { - // skip if we encountered an earlier error - select { - case <-s.done: - return - default: - s.list[i], s.list[j] = s.list[j], s.list[i] - } } func (s *sorter) Less(i, j int) bool { @@ -239,13 +268,16 @@ func (s *sorter) Less(i, j int) bool { default: } + if ssink, ok := s.sink.(sliceSink); ok { + return s.less(ssink.slice().Index(i), ssink.slice().Index(j)) + } return s.less(*s.list[i].value, *s.list[j].value) } type sink interface { bucketName() string flush() error - add(*item) (bool, error) + add(*item) error readOnly() bool } @@ -253,6 +285,12 @@ type reflectSink interface { elem() reflect.Value } +type sliceSink interface { + slice() reflect.Value + setSlice(reflect.Value) + reset() +} + func newListSink(node Node, to interface{}) (*listSink, error) { ref := reflect.ValueOf(to) @@ -277,7 +315,6 @@ func newListSink(node Node, to interface{}) (*listSink, error) { isPtr: sliceType.Elem().Kind() == reflect.Ptr, elemType: elemType, name: elemType.Name(), - limit: -1, results: reflect.MakeSlice(reflect.Indirect(ref).Type(), 0, 0), }, nil } @@ -289,11 +326,21 @@ type listSink struct { elemType reflect.Type name string isPtr bool - skip int - limit int idx int } +func (l *listSink) slice() reflect.Value { + return l.results +} + +func (l *listSink) setSlice(s reflect.Value) { + l.results = s +} + +func (l *listSink) reset() { + l.results = reflect.MakeSlice(reflect.Indirect(l.ref).Type(), 0, 0) +} + func (l *listSink) elem() reflect.Value { if l.results.IsValid() && l.idx < l.results.Len() { return l.results.Index(l.idx).Addr() @@ -305,20 +352,7 @@ func (l *listSink) bucketName() string { return l.name } -func (l *listSink) add(i *item) (bool, error) { - if l.limit == 0 { - return true, nil - } - - if l.skip > 0 { - l.skip-- - return false, nil - } - - if l.limit > 0 { - l.limit-- - } - +func (l *listSink) add(i *item) error { if l.idx == l.results.Len() { if l.isPtr { l.results = reflect.Append(l.results, *i.value) @@ -329,7 +363,7 @@ func (l *listSink) add(i *item) (bool, error) { l.idx++ - return l.limit == 0, nil + return nil } func (l *listSink) flush() error { @@ -361,7 +395,6 @@ func newFirstSink(node Node, to interface{}) (*firstSink, error) { type firstSink struct { node Node ref reflect.Value - skip int found bool } @@ -373,15 +406,10 @@ func (f *firstSink) bucketName() string { return reflect.Indirect(f.ref).Type().Name() } -func (f *firstSink) add(i *item) (bool, error) { - if f.skip > 0 { - f.skip-- - return false, nil - } - +func (f *firstSink) add(i *item) error { reflect.Indirect(f.ref).Set(i.value.Elem()) f.found = true - return true, nil + return nil } func (f *firstSink) flush() error { @@ -412,8 +440,6 @@ func newDeleteSink(node Node, kind interface{}) (*deleteSink, error) { type deleteSink struct { node Node ref reflect.Value - skip int - limit int removed int } @@ -425,19 +451,10 @@ func (d *deleteSink) bucketName() string { return reflect.Indirect(d.ref).Type().Name() } -func (d *deleteSink) add(i *item) (bool, error) { - if d.skip > 0 { - d.skip-- - return false, nil - } - - if d.limit > 0 { - d.limit-- - } - +func (d *deleteSink) add(i *item) error { info, err := extract(&d.ref) if err != nil { - return false, err + return err } for fieldName, fieldCfg := range info.Fields { @@ -446,20 +463,20 @@ func (d *deleteSink) add(i *item) (bool, error) { } idx, err := getIndex(i.bucket, fieldCfg.Index, fieldName) if err != nil { - return false, err + return err } err = idx.RemoveID(i.k) if err != nil { if err == index.ErrNotFound { - return false, ErrNotFound + return ErrNotFound } - return false, err + return err } } d.removed++ - return d.limit == 0, i.bucket.Delete(i.k) + return i.bucket.Delete(i.k) } func (d *deleteSink) flush() error { @@ -490,8 +507,6 @@ func newCountSink(node Node, kind interface{}) (*countSink, error) { type countSink struct { node Node ref reflect.Value - skip int - limit int counter int } @@ -503,18 +518,9 @@ func (c *countSink) bucketName() string { return reflect.Indirect(c.ref).Type().Name() } -func (c *countSink) add(i *item) (bool, error) { - if c.skip > 0 { - c.skip-- - return false, nil - } - - if c.limit > 0 { - c.limit-- - } - +func (c *countSink) add(i *item) error { c.counter++ - return c.limit == 0, nil + return nil } func (c *countSink) flush() error { @@ -526,42 +532,25 @@ func (c *countSink) readOnly() bool { } func newRawSink() *rawSink { - return &rawSink{ - limit: -1, - } + return &rawSink{} } type rawSink struct { results [][]byte - skip int - limit int execFn func([]byte, []byte) error } -func (r *rawSink) add(i *item) (bool, error) { - if r.limit == 0 { - return true, nil - } - - if r.skip > 0 { - r.skip-- - return false, nil - } - - if r.limit > 0 { - r.limit-- - } - +func (r *rawSink) add(i *item) error { if r.execFn != nil { err := r.execFn(i.k, i.v) if err != nil { - return false, err + return err } } else { r.results = append(r.results, i.v) } - return r.limit == 0, nil + return nil } func (r *rawSink) bucketName() string { @@ -589,8 +578,6 @@ func newEachSink(to interface{}) (*eachSink, error) { } type eachSink struct { - skip int - limit int ref reflect.Value execFn func(interface{}) error } @@ -603,26 +590,8 @@ func (e *eachSink) bucketName() string { return reflect.Indirect(e.ref).Type().Name() } -func (e *eachSink) add(i *item) (bool, error) { - if e.limit == 0 { - return true, nil - } - - if e.skip > 0 { - e.skip-- - return false, nil - } - - if e.limit > 0 { - e.limit-- - } - - err := e.execFn(i.value.Interface()) - if err != nil { - return false, err - } - - return e.limit == 0, nil +func (e *eachSink) add(i *item) error { + return e.execFn(i.value.Interface()) } func (e *eachSink) flush() error { diff --git a/sink_sorter_swap.go b/sink_sorter_swap.go new file mode 100644 index 0000000..43af7cd --- /dev/null +++ b/sink_sorter_swap.go @@ -0,0 +1,22 @@ +// +build !go1.8 + +package storm + +import "reflect" + +func (s *sorter) Swap(i, j int) { + // skip if we encountered an earlier error + select { + case <-s.done: + return + default: + } + + if ssink, ok := s.sink.(sliceSink); ok { + x, y := ssink.slice().Index(i).Interface(), ssink.slice().Index(j).Interface() + ssink.slice().Index(i).Set(reflect.ValueOf(y)) + ssink.slice().Index(j).Set(reflect.ValueOf(x)) + } else { + s.list[i], s.list[j] = s.list[j], s.list[i] + } +} diff --git a/sink_sorter_swap_go1.8.go b/sink_sorter_swap_go1.8.go new file mode 100644 index 0000000..21bf7ae --- /dev/null +++ b/sink_sorter_swap_go1.8.go @@ -0,0 +1,20 @@ +// +build go1.8 + +package storm + +import "reflect" + +func (s *sorter) Swap(i, j int) { + // skip if we encountered an earlier error + select { + case <-s.done: + return + default: + } + + if ssink, ok := s.sink.(sliceSink); ok { + reflect.Swapper(ssink.slice().Interface())(i, j) + } else { + s.list[i], s.list[j] = s.list[j], s.list[i] + } +}