diff --git a/orm_query.go b/orm_query.go index ca69e12..eea97c8 100644 --- a/orm_query.go +++ b/orm_query.go @@ -387,7 +387,6 @@ func (dba *Orm) _valueFromStruct(bindResult reflect.Value, field string) (v inte // Chunk : 分块处理数据,当要处理很多数据的时候, 我不需要知道具体是多少数据, 我只需要每次取limit条数据, // 然后不断的增加offset去取更多数据, 从而达到分块处理更多数据的目的 -// TODO 后续增加 gorotine 支持, 提高批量数据处理效率, 预计需要增加获取更多链接的支持 func (dba *Orm) Chunk(limit int, callback func([]Data) error) (err error) { var page = 1 var tabname = dba.GetISession().GetIBinder().GetBindName() @@ -419,36 +418,68 @@ func (dba *Orm) Chunk(limit int, callback func([]Data) error) (err error) { return } -// ChunkWG : ChunkWG是在Chunk的基础上,新增带有多线程性质的处理方式,理论性能将会有30%左右的提升,需要处理的数据量越大越多,比原版Chunk的提升就越明显 -// 使用ChunkWG处理大量数据,请尽量保证有足够多的连接数,避免影响程序其他需要数据库的部分连接进入等待状态 -func (dba *Orm) ChunkWG(limit int, callback func([]Data) error) (err error) { - //var page = 1 - //var tabname = dba.GetISession().GetIBinder().GetBindName() - //prefix := dba.GetISession().GetIBinder().GetBindPrefix() - //tabname2 := strings.TrimPrefix(tabname, prefix) - //where, fields, group := dba.where, dba.fields, dba.group - // - //// 先执行一条看看是否报错, 同时设置指定的limit, offset - //dba.Table(tabname2).Limit(limit).Page(page) - //if err = dba.Select(); err != nil { - // return - //} - //result := dba.GetBindAll() - //for len(result) > 0 { - // if err = callback(result); err != nil { - // break - // } - // page++ - // // 清理绑定数据, 进行下一次操作, 因为绑定数据是每一次执行的时候都会解析并保存的 - // // 而第二次以后执行的, 都会再次解析并保存, 数据结构是slice, 故会累积起来 - // dba.ClearBindValues() - // dba.where, dba.fields, dba.group = where, fields, group - // dba.Table(tabname2).Limit(limit).Page(page) - // if err = dba.Select(); err != nil { - // break - // } - // result = dba.GetBindAll() - //} +// ChunkWG : ChunkWG是保留Chunk的使用方法的基础上,新增多线程读取&多线程执行的方式,注意onetime_exec_thread不宜过多,推荐4,不宜过大因为采用的是盲读的方法,详情请参考github-wiki的介绍部分 +// 原理与PaginatorWG方法类似,保留了事务隔离性并且加入了 +func (dba *Orm) ChunkWG(onetime_exec_thread int, limit int, callback func([]Data) error) (err error) { + if onetime_exec_thread <= 0 { + onetime_exec_thread = 1 + } + if onetime_exec_thread > 20 { + onetime_exec_thread = 20 + } + var page = 1 + tabname := dba.GetISession().GetIBinder().GetBindName() + prefix := dba.GetISession().GetIBinder().GetBindPrefix() + tabname2 := strings.TrimPrefix(tabname, prefix) + where, fields, group := dba.where, dba.fields, dba.group + dba.Table(tabname2).Limit(limit).Page(page) + if err = dba.Select(); err != nil { + return + } + result := dba.GetBindAll() + if len(result) < 1 { + return + } + continue_run := true + for continue_run { + var mp map[string][]interface{} + for i := 0; i < onetime_exec_thread; i++ { + page++ + dba.ClearBindValues() + dba.where, dba.fields, dba.group = where, fields, group + dba.Table(tabname2).Limit(limit).Page(page) + sqlStr, args, err := dba.BuildSql() + if err != nil { + continue_run = false + return err + } + mp[sqlStr] = args + } + var wg sync.WaitGroup + wg.Add(len(mp)) + for sqlStr, args := range mp { + go func(sql string, arg []interface{}) { + _result, _err := dba.Query(sql, arg...) + if _err != nil { + wg.Done() + continue_run = false + logger.Error(_err.Error()) + return + } + if len(_result) < 1 { + wg.Done() + continue_run = false + return + } + if _err = callback(_result); _err != nil { + wg.Done() + continue_run = false + return + } + }(sqlStr, args) + } + wg.Wait() + } return } diff --git a/orm_query_interface.go b/orm_query_interface.go index c1fa74b..9f059fd 100644 --- a/orm_query_interface.go +++ b/orm_query_interface.go @@ -37,6 +37,9 @@ type IOrmQuery interface { // 例如,我们可以将处理全部 users 表数据分割成一次处理 100 条记录的小组块 // 你可以通过从闭包函数中返回 err 来终止组块的运行 Chunk(limit int, callback func([]Data) error) (err error) + + // ChunkWG : ChunkWG是保留Chunk的使用方法的基础上,新增多线程读取&多线程执行的方式,注意onetime_exec_thread不宜过多,推荐4,不宜过大因为采用的是盲读的方法,详情请参考github-wiki的介绍部分 + ChunkWG(onetime_exec_thread int, limit int, callback func([]Data) error) (err error) // 跟Chunk类似,只不过callback的是传入的结构体 ChunkStruct(limit int, callback func() error) (err error) Loop(limit int, callback func([]Data) error) (err error)