mirror of
https://github.com/tobycroft/gorose-pro.git
synced 2025-09-26 20:51:27 +08:00
v1.14.0 FIX/New
- 修复db.Chunk功能在limit为1时只能取到一条数据,当limit为多时又会一次性取出多条数据而不是一条一条获取一条一条处理的BUG - 新增db.ChunkWG多线程大量数据读取处理方法
This commit is contained in:
93
orm_query.go
93
orm_query.go
@@ -387,7 +387,6 @@ func (dba *Orm) _valueFromStruct(bindResult reflect.Value, field string) (v inte
|
|||||||
|
|
||||||
// Chunk : 分块处理数据,当要处理很多数据的时候, 我不需要知道具体是多少数据, 我只需要每次取limit条数据,
|
// Chunk : 分块处理数据,当要处理很多数据的时候, 我不需要知道具体是多少数据, 我只需要每次取limit条数据,
|
||||||
// 然后不断的增加offset去取更多数据, 从而达到分块处理更多数据的目的
|
// 然后不断的增加offset去取更多数据, 从而达到分块处理更多数据的目的
|
||||||
// TODO 后续增加 gorotine 支持, 提高批量数据处理效率, 预计需要增加获取更多链接的支持
|
|
||||||
func (dba *Orm) Chunk(limit int, callback func([]Data) error) (err error) {
|
func (dba *Orm) Chunk(limit int, callback func([]Data) error) (err error) {
|
||||||
var page = 1
|
var page = 1
|
||||||
var tabname = dba.GetISession().GetIBinder().GetBindName()
|
var tabname = dba.GetISession().GetIBinder().GetBindName()
|
||||||
@@ -419,36 +418,68 @@ func (dba *Orm) Chunk(limit int, callback func([]Data) error) (err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// ChunkWG : ChunkWG是在Chunk的基础上,新增带有多线程性质的处理方式,理论性能将会有30%左右的提升,需要处理的数据量越大越多,比原版Chunk的提升就越明显
|
// ChunkWG : ChunkWG是保留Chunk的使用方法的基础上,新增多线程读取&多线程执行的方式,注意onetime_exec_thread不宜过多,推荐4,不宜过大因为采用的是盲读的方法,详情请参考github-wiki的介绍部分
|
||||||
// 使用ChunkWG处理大量数据,请尽量保证有足够多的连接数,避免影响程序其他需要数据库的部分连接进入等待状态
|
// 原理与PaginatorWG方法类似,保留了事务隔离性并且加入了
|
||||||
func (dba *Orm) ChunkWG(limit int, callback func([]Data) error) (err error) {
|
func (dba *Orm) ChunkWG(onetime_exec_thread int, limit int, callback func([]Data) error) (err error) {
|
||||||
//var page = 1
|
if onetime_exec_thread <= 0 {
|
||||||
//var tabname = dba.GetISession().GetIBinder().GetBindName()
|
onetime_exec_thread = 1
|
||||||
//prefix := dba.GetISession().GetIBinder().GetBindPrefix()
|
}
|
||||||
//tabname2 := strings.TrimPrefix(tabname, prefix)
|
if onetime_exec_thread > 20 {
|
||||||
//where, fields, group := dba.where, dba.fields, dba.group
|
onetime_exec_thread = 20
|
||||||
//
|
}
|
||||||
//// 先执行一条看看是否报错, 同时设置指定的limit, offset
|
var page = 1
|
||||||
//dba.Table(tabname2).Limit(limit).Page(page)
|
tabname := dba.GetISession().GetIBinder().GetBindName()
|
||||||
//if err = dba.Select(); err != nil {
|
prefix := dba.GetISession().GetIBinder().GetBindPrefix()
|
||||||
// return
|
tabname2 := strings.TrimPrefix(tabname, prefix)
|
||||||
//}
|
where, fields, group := dba.where, dba.fields, dba.group
|
||||||
//result := dba.GetBindAll()
|
dba.Table(tabname2).Limit(limit).Page(page)
|
||||||
//for len(result) > 0 {
|
if err = dba.Select(); err != nil {
|
||||||
// if err = callback(result); err != nil {
|
return
|
||||||
// break
|
}
|
||||||
// }
|
result := dba.GetBindAll()
|
||||||
// page++
|
if len(result) < 1 {
|
||||||
// // 清理绑定数据, 进行下一次操作, 因为绑定数据是每一次执行的时候都会解析并保存的
|
return
|
||||||
// // 而第二次以后执行的, 都会再次解析并保存, 数据结构是slice, 故会累积起来
|
}
|
||||||
// dba.ClearBindValues()
|
continue_run := true
|
||||||
// dba.where, dba.fields, dba.group = where, fields, group
|
for continue_run {
|
||||||
// dba.Table(tabname2).Limit(limit).Page(page)
|
var mp map[string][]interface{}
|
||||||
// if err = dba.Select(); err != nil {
|
for i := 0; i < onetime_exec_thread; i++ {
|
||||||
// break
|
page++
|
||||||
// }
|
dba.ClearBindValues()
|
||||||
// result = dba.GetBindAll()
|
dba.where, dba.fields, dba.group = where, fields, group
|
||||||
//}
|
dba.Table(tabname2).Limit(limit).Page(page)
|
||||||
|
sqlStr, args, err := dba.BuildSql()
|
||||||
|
if err != nil {
|
||||||
|
continue_run = false
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
mp[sqlStr] = args
|
||||||
|
}
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(len(mp))
|
||||||
|
for sqlStr, args := range mp {
|
||||||
|
go func(sql string, arg []interface{}) {
|
||||||
|
_result, _err := dba.Query(sql, arg...)
|
||||||
|
if _err != nil {
|
||||||
|
wg.Done()
|
||||||
|
continue_run = false
|
||||||
|
logger.Error(_err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(_result) < 1 {
|
||||||
|
wg.Done()
|
||||||
|
continue_run = false
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if _err = callback(_result); _err != nil {
|
||||||
|
wg.Done()
|
||||||
|
continue_run = false
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}(sqlStr, args)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -37,6 +37,9 @@ type IOrmQuery interface {
|
|||||||
// 例如,我们可以将处理全部 users 表数据分割成一次处理 100 条记录的小组块
|
// 例如,我们可以将处理全部 users 表数据分割成一次处理 100 条记录的小组块
|
||||||
// 你可以通过从闭包函数中返回 err 来终止组块的运行
|
// 你可以通过从闭包函数中返回 err 来终止组块的运行
|
||||||
Chunk(limit int, callback func([]Data) error) (err error)
|
Chunk(limit int, callback func([]Data) error) (err error)
|
||||||
|
|
||||||
|
// ChunkWG : ChunkWG是保留Chunk的使用方法的基础上,新增多线程读取&多线程执行的方式,注意onetime_exec_thread不宜过多,推荐4,不宜过大因为采用的是盲读的方法,详情请参考github-wiki的介绍部分
|
||||||
|
ChunkWG(onetime_exec_thread int, limit int, callback func([]Data) error) (err error)
|
||||||
// 跟Chunk类似,只不过callback的是传入的结构体
|
// 跟Chunk类似,只不过callback的是传入的结构体
|
||||||
ChunkStruct(limit int, callback func() error) (err error)
|
ChunkStruct(limit int, callback func() error) (err error)
|
||||||
Loop(limit int, callback func([]Data) error) (err error)
|
Loop(limit int, callback func([]Data) error) (err error)
|
||||||
|
Reference in New Issue
Block a user