Reduce memory usage and copies by 50% for listSink when using OrderBy

This commit is contained in:
Peter Fern
2017-05-06 18:00:54 +10:00
parent dbf518c94e
commit 789bde7563
5 changed files with 166 additions and 167 deletions

View File

@@ -58,6 +58,7 @@ func (n *node) One(fieldName string, value interface{}, to interface{}) error {
field, ok := cfg.Fields[fieldName]
if !ok || (!field.IsID && field.Index == "") {
query := newQuery(n, q.StrictEq(fieldName, value))
query.Limit(1)
if n.tx != nil {
err = query.query(n.tx, sink)
@@ -141,9 +142,8 @@ func (n *node) Find(fieldName string, value interface{}, to interface{}, options
field, ok := cfg.Fields[fieldName]
if !ok || (!field.IsID && (field.Index == "" || value == nil)) {
sink.limit = opts.Limit
sink.skip = opts.Skip
query := newQuery(n, q.Eq(fieldName, value))
query.Skip(opts.Skip).Limit(opts.Limit)
if opts.Reverse {
query.Reverse()
@@ -190,14 +190,14 @@ func (n *node) find(tx *bolt.Tx, bucketName, fieldName string, cfg *structConfig
sink.results = reflect.MakeSlice(reflect.Indirect(sink.ref).Type(), len(list), len(list))
sorter := newSorter(n, sink, nil, nil, false)
sorter := newSorter(n, sink)
for i := range list {
raw := bucket.Get(list[i])
if raw == nil {
return ErrNotFound
}
if _, err := sorter.filter(bucket, list[i], raw); err != nil {
if _, err := sorter.filter(nil, bucket, list[i], raw); err != nil {
return err
}
}
@@ -337,9 +337,8 @@ func (n *node) Range(fieldName string, min, max, to interface{}, options ...func
field, ok := cfg.Fields[fieldName]
if !ok || (!field.IsID && field.Index == "") {
sink.limit = opts.Limit
sink.skip = opts.Skip
query := newQuery(n, q.And(q.Gte(fieldName, min), q.Lte(fieldName, max)))
query.Skip(opts.Skip).Limit(opts.Limit)
if opts.Reverse {
query.Reverse()
@@ -389,14 +388,14 @@ func (n *node) rnge(tx *bolt.Tx, bucketName, fieldName string, cfg *structConfig
}
sink.results = reflect.MakeSlice(reflect.Indirect(sink.ref).Type(), len(list), len(list))
sorter := newSorter(n, sink, nil, nil, false)
sorter := newSorter(n, sink)
for i := range list {
raw := bucket.Get(list[i])
if raw == nil {
return ErrNotFound
}
if _, err := sorter.filter(bucket, list[i], raw); err != nil {
if _, err := sorter.filter(nil, bucket, list[i], raw); err != nil {
return err
}
}

View File

@@ -101,8 +101,6 @@ func (q *query) Find(to interface{}) error {
return err
}
sink.limit = q.limit
sink.skip = q.skip
return q.runQuery(sink)
}
@@ -112,7 +110,7 @@ func (q *query) First(to interface{}) error {
return err
}
sink.skip = q.skip
q.limit = 1
return q.runQuery(sink)
}
@@ -122,9 +120,6 @@ func (q *query) Delete(kind interface{}) error {
return err
}
sink.limit = q.limit
sink.skip = q.skip
return q.runQuery(sink)
}
@@ -134,9 +129,6 @@ func (q *query) Count(kind interface{}) (int, error) {
return 0, err
}
sink.limit = q.limit
sink.skip = q.skip
err = q.runQuery(sink)
if err != nil {
return 0, err
@@ -148,9 +140,6 @@ func (q *query) Count(kind interface{}) (int, error) {
func (q *query) Raw() ([][]byte, error) {
sink := newRawSink()
sink.limit = q.limit
sink.skip = q.skip
err := q.runQuery(sink)
if err != nil {
return nil, err
@@ -162,8 +151,6 @@ func (q *query) Raw() ([][]byte, error) {
func (q *query) RawEach(fn func([]byte, []byte) error) error {
sink := newRawSink()
sink.limit = q.limit
sink.skip = q.skip
sink.execFn = fn
return q.runQuery(sink)
@@ -175,8 +162,6 @@ func (q *query) Each(kind interface{}, fn func(interface{}) error) error {
return err
}
sink.limit = q.limit
sink.skip = q.skip
sink.execFn = fn
return q.runQuery(sink)
@@ -207,7 +192,11 @@ func (q *query) query(tx *bolt.Tx, sink sink) error {
return sink.flush()
}
sorter := newSorter(q.node, sink, q.tree, q.orderBy, q.reverse)
sorter := newSorter(q.node, sink)
sorter.orderBy = q.orderBy
sorter.reverse = q.reverse
sorter.skip = q.skip
sorter.limit = q.limit
if bucket != nil {
c := internal.Cursor{C: bucket.Cursor(), Reverse: q.reverse}
for k, v := c.First(); k != nil; k, v = c.Next() {
@@ -215,7 +204,7 @@ func (q *query) query(tx *bolt.Tx, sink sink) error {
continue
}
stop, err := sorter.filter(bucket, k, v)
stop, err := sorter.filter(q.tree, bucket, k, v)
if err != nil {
return err
}

251
sink.go
View File

@@ -16,58 +16,49 @@ type item struct {
v []byte
}
func newSorter(n Node, snk sink, tree q.Matcher, orderBy []string, reverse bool) *sorter {
func newSorter(n Node, snk sink) *sorter {
return &sorter{
node: n,
sink: snk,
tree: tree,
orderBy: orderBy,
reverse: reverse,
list: make([]*item, 0),
err: make(chan error),
done: make(chan struct{}),
node: n,
sink: snk,
skip: 0,
limit: -1,
list: make([]*item, 0),
err: make(chan error),
done: make(chan struct{}),
}
}
type sorter struct {
node Node
sink sink
tree q.Matcher
list []*item
skip int
limit int
orderBy []string
reverse bool
err chan error
done chan struct{}
}
func (s *sorter) filter(bucket *bolt.Bucket, k, v []byte) (bool, error) {
func (s *sorter) filter(tree q.Matcher, bucket *bolt.Bucket, k, v []byte) (bool, error) {
itm := &item{
bucket: bucket,
k: k,
v: v,
}
rsink, ok := s.sink.(reflectSink)
if !ok {
return s.sink.add(&item{
bucket: bucket,
k: k,
v: v,
})
return s.add(itm)
}
newElem := rsink.elem()
if err := s.node.Codec().Unmarshal(v, newElem.Interface()); err != nil {
return false, err
}
itm.value = &newElem
itm := &item{
bucket: bucket,
value: &newElem,
k: k,
v: v,
}
if s.tree == nil {
if len(s.orderBy) == 0 {
return s.sink.add(itm)
}
} else {
ok, err := s.tree.Match(newElem.Interface())
if tree != nil {
ok, err := tree.Match(newElem.Interface())
if err != nil {
return false, err
}
@@ -77,7 +68,12 @@ func (s *sorter) filter(bucket *bolt.Bucket, k, v []byte) (bool, error) {
}
if len(s.orderBy) == 0 {
return s.sink.add(itm)
return s.add(itm)
}
if _, ok := s.sink.(sliceSink); ok {
// add directly to sink, we'll apply skip/limits after sorting
return false, s.sink.add(itm)
}
s.list = append(s.list, itm)
@@ -85,6 +81,25 @@ func (s *sorter) filter(bucket *bolt.Bucket, k, v []byte) (bool, error) {
return false, nil
}
func (s *sorter) add(itm *item) (stop bool, err error) {
if s.limit == 0 {
return true, nil
}
if s.skip > 0 {
s.skip--
return false, nil
}
if s.limit > 0 {
s.limit--
}
err = s.sink.add(itm)
return s.limit == 0, err
}
func (s *sorter) compareValue(left reflect.Value, right reflect.Value) int {
if !left.IsValid() || !right.IsValid() {
if left.IsValid() {
@@ -195,11 +210,31 @@ func (s *sorter) flush() error {
return err
}
if ssink, ok := s.sink.(sliceSink); ok {
if !ssink.slice().IsValid() {
return s.sink.flush()
}
skip := s.skip
if s.skip >= ssink.slice().Len() {
ssink.reset()
return s.sink.flush()
}
if skip < 0 {
skip = 0
}
limit := s.limit
if skip+limit > ssink.slice().Len() || limit < 1 {
limit = ssink.slice().Len()
}
ssink.setSlice(ssink.slice().Slice(skip, limit))
return s.sink.flush()
}
for _, itm := range s.list {
if itm == nil {
break
}
stop, err := s.sink.add(itm)
stop, err := s.add(itm)
if err != nil {
return err
}
@@ -217,18 +252,12 @@ func (s *sorter) Len() int {
case <-s.done:
return 0
default:
return len(s.list)
}
}
if ssink, ok := s.sink.(sliceSink); ok {
return ssink.slice().Len()
}
return len(s.list)
func (s *sorter) Swap(i, j int) {
// skip if we encountered an earlier error
select {
case <-s.done:
return
default:
s.list[i], s.list[j] = s.list[j], s.list[i]
}
}
func (s *sorter) Less(i, j int) bool {
@@ -239,13 +268,16 @@ func (s *sorter) Less(i, j int) bool {
default:
}
if ssink, ok := s.sink.(sliceSink); ok {
return s.less(ssink.slice().Index(i), ssink.slice().Index(j))
}
return s.less(*s.list[i].value, *s.list[j].value)
}
type sink interface {
bucketName() string
flush() error
add(*item) (bool, error)
add(*item) error
readOnly() bool
}
@@ -253,6 +285,12 @@ type reflectSink interface {
elem() reflect.Value
}
type sliceSink interface {
slice() reflect.Value
setSlice(reflect.Value)
reset()
}
func newListSink(node Node, to interface{}) (*listSink, error) {
ref := reflect.ValueOf(to)
@@ -277,7 +315,6 @@ func newListSink(node Node, to interface{}) (*listSink, error) {
isPtr: sliceType.Elem().Kind() == reflect.Ptr,
elemType: elemType,
name: elemType.Name(),
limit: -1,
results: reflect.MakeSlice(reflect.Indirect(ref).Type(), 0, 0),
}, nil
}
@@ -289,11 +326,21 @@ type listSink struct {
elemType reflect.Type
name string
isPtr bool
skip int
limit int
idx int
}
func (l *listSink) slice() reflect.Value {
return l.results
}
func (l *listSink) setSlice(s reflect.Value) {
l.results = s
}
func (l *listSink) reset() {
l.results = reflect.MakeSlice(reflect.Indirect(l.ref).Type(), 0, 0)
}
func (l *listSink) elem() reflect.Value {
if l.results.IsValid() && l.idx < l.results.Len() {
return l.results.Index(l.idx).Addr()
@@ -305,20 +352,7 @@ func (l *listSink) bucketName() string {
return l.name
}
func (l *listSink) add(i *item) (bool, error) {
if l.limit == 0 {
return true, nil
}
if l.skip > 0 {
l.skip--
return false, nil
}
if l.limit > 0 {
l.limit--
}
func (l *listSink) add(i *item) error {
if l.idx == l.results.Len() {
if l.isPtr {
l.results = reflect.Append(l.results, *i.value)
@@ -329,7 +363,7 @@ func (l *listSink) add(i *item) (bool, error) {
l.idx++
return l.limit == 0, nil
return nil
}
func (l *listSink) flush() error {
@@ -361,7 +395,6 @@ func newFirstSink(node Node, to interface{}) (*firstSink, error) {
type firstSink struct {
node Node
ref reflect.Value
skip int
found bool
}
@@ -373,15 +406,10 @@ func (f *firstSink) bucketName() string {
return reflect.Indirect(f.ref).Type().Name()
}
func (f *firstSink) add(i *item) (bool, error) {
if f.skip > 0 {
f.skip--
return false, nil
}
func (f *firstSink) add(i *item) error {
reflect.Indirect(f.ref).Set(i.value.Elem())
f.found = true
return true, nil
return nil
}
func (f *firstSink) flush() error {
@@ -412,8 +440,6 @@ func newDeleteSink(node Node, kind interface{}) (*deleteSink, error) {
type deleteSink struct {
node Node
ref reflect.Value
skip int
limit int
removed int
}
@@ -425,19 +451,10 @@ func (d *deleteSink) bucketName() string {
return reflect.Indirect(d.ref).Type().Name()
}
func (d *deleteSink) add(i *item) (bool, error) {
if d.skip > 0 {
d.skip--
return false, nil
}
if d.limit > 0 {
d.limit--
}
func (d *deleteSink) add(i *item) error {
info, err := extract(&d.ref)
if err != nil {
return false, err
return err
}
for fieldName, fieldCfg := range info.Fields {
@@ -446,20 +463,20 @@ func (d *deleteSink) add(i *item) (bool, error) {
}
idx, err := getIndex(i.bucket, fieldCfg.Index, fieldName)
if err != nil {
return false, err
return err
}
err = idx.RemoveID(i.k)
if err != nil {
if err == index.ErrNotFound {
return false, ErrNotFound
return ErrNotFound
}
return false, err
return err
}
}
d.removed++
return d.limit == 0, i.bucket.Delete(i.k)
return i.bucket.Delete(i.k)
}
func (d *deleteSink) flush() error {
@@ -490,8 +507,6 @@ func newCountSink(node Node, kind interface{}) (*countSink, error) {
type countSink struct {
node Node
ref reflect.Value
skip int
limit int
counter int
}
@@ -503,18 +518,9 @@ func (c *countSink) bucketName() string {
return reflect.Indirect(c.ref).Type().Name()
}
func (c *countSink) add(i *item) (bool, error) {
if c.skip > 0 {
c.skip--
return false, nil
}
if c.limit > 0 {
c.limit--
}
func (c *countSink) add(i *item) error {
c.counter++
return c.limit == 0, nil
return nil
}
func (c *countSink) flush() error {
@@ -526,42 +532,25 @@ func (c *countSink) readOnly() bool {
}
func newRawSink() *rawSink {
return &rawSink{
limit: -1,
}
return &rawSink{}
}
type rawSink struct {
results [][]byte
skip int
limit int
execFn func([]byte, []byte) error
}
func (r *rawSink) add(i *item) (bool, error) {
if r.limit == 0 {
return true, nil
}
if r.skip > 0 {
r.skip--
return false, nil
}
if r.limit > 0 {
r.limit--
}
func (r *rawSink) add(i *item) error {
if r.execFn != nil {
err := r.execFn(i.k, i.v)
if err != nil {
return false, err
return err
}
} else {
r.results = append(r.results, i.v)
}
return r.limit == 0, nil
return nil
}
func (r *rawSink) bucketName() string {
@@ -589,8 +578,6 @@ func newEachSink(to interface{}) (*eachSink, error) {
}
type eachSink struct {
skip int
limit int
ref reflect.Value
execFn func(interface{}) error
}
@@ -603,26 +590,8 @@ func (e *eachSink) bucketName() string {
return reflect.Indirect(e.ref).Type().Name()
}
func (e *eachSink) add(i *item) (bool, error) {
if e.limit == 0 {
return true, nil
}
if e.skip > 0 {
e.skip--
return false, nil
}
if e.limit > 0 {
e.limit--
}
err := e.execFn(i.value.Interface())
if err != nil {
return false, err
}
return e.limit == 0, nil
func (e *eachSink) add(i *item) error {
return e.execFn(i.value.Interface())
}
func (e *eachSink) flush() error {

22
sink_sorter_swap.go Normal file
View File

@@ -0,0 +1,22 @@
// +build !go1.8
package storm
import "reflect"
func (s *sorter) Swap(i, j int) {
// skip if we encountered an earlier error
select {
case <-s.done:
return
default:
}
if ssink, ok := s.sink.(sliceSink); ok {
x, y := ssink.slice().Index(i).Interface(), ssink.slice().Index(j).Interface()
ssink.slice().Index(i).Set(reflect.ValueOf(y))
ssink.slice().Index(j).Set(reflect.ValueOf(x))
} else {
s.list[i], s.list[j] = s.list[j], s.list[i]
}
}

20
sink_sorter_swap_go1.8.go Normal file
View File

@@ -0,0 +1,20 @@
// +build go1.8
package storm
import "reflect"
func (s *sorter) Swap(i, j int) {
// skip if we encountered an earlier error
select {
case <-s.done:
return
default:
}
if ssink, ok := s.sink.(sliceSink); ok {
reflect.Swapper(ssink.slice().Interface())(i, j)
} else {
s.list[i], s.list[j] = s.list[j], s.list[i]
}
}