mirror of
https://github.com/xxjwxc/public.git
synced 2025-09-26 20:01:19 +08:00
new
This commit is contained in:
1
go.mod
1
go.mod
@@ -29,6 +29,7 @@ require (
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/spf13/cobra v1.0.0
|
||||
github.com/syndtr/goleveldb v1.0.0
|
||||
github.com/xxjwxc/gowp v0.0.0-20200603130651-4d7368b0e285
|
||||
go.uber.org/zap v1.10.0
|
||||
golang.org/x/crypto v0.0.0-20191205180655-e7c4368fe9dd
|
||||
golang.org/x/image v0.0.0-20200430140353-33d19683fad8 // indirect
|
||||
|
3
go.sum
3
go.sum
@@ -175,7 +175,10 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1
|
||||
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
|
||||
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
|
||||
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
|
||||
github.com/xxjwxc/gowp v0.0.0-20200603130651-4d7368b0e285 h1:gbdax2ZvHZwe8zxu7by/HMuDUS47iHR2zmEzlgAHBMw=
|
||||
github.com/xxjwxc/gowp v0.0.0-20200603130651-4d7368b0e285/go.mod h1:yJ/fY5BorWARfDDsxBU/MyQTHc5MVyNcqBQQYD6MN0k=
|
||||
github.com/xxjwxc/oauth2 v0.0.0-20200316140438-ddd4e3c5f272 h1:aKA2UeM+nzEanWpzZWqkD3e12FlX6bRdhcOsNhm21X0=
|
||||
github.com/xxjwxc/public v0.0.0-20200603115833-341beff27850/go.mod h1:fp3M+FEQrCgWD1fZ/PLwZkCTglf086OEhC9LcydAUnc=
|
||||
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
|
||||
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
|
||||
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
|
||||
|
@@ -1,28 +0,0 @@
|
||||
package workerpool
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/xxjwxc/public/myqueue"
|
||||
)
|
||||
|
||||
// TaskHandler process .定义函数回调体
|
||||
type TaskHandler func() error
|
||||
|
||||
// workerPool serves incoming connections via a pool of workers
|
||||
// in FILO order, i.e. the most recently stopped worker will serve the next
|
||||
// incoming connection.
|
||||
//
|
||||
// Such a scheme keeps CPU caches hot (in theory).
|
||||
type WorkerPool struct {
|
||||
//sync.Mutex
|
||||
//maxWorkersCount int //最大的工作协程数
|
||||
//start sync.Once
|
||||
closed int32
|
||||
errChan chan error //错误chan
|
||||
timeout time.Duration //最大超时时间
|
||||
wg sync.WaitGroup
|
||||
task chan TaskHandler
|
||||
waitingQueue *myqueue.MyQueue
|
||||
}
|
@@ -1,152 +0,0 @@
|
||||
package workerpool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/xxjwxc/public/myqueue"
|
||||
|
||||
"github.com/xxjwxc/public/mylog"
|
||||
)
|
||||
|
||||
//New 注册工作池,并设置最大并发数
|
||||
//new workpool and set the max number of concurrencies
|
||||
func New(max int) *WorkerPool {
|
||||
if max < 1 {
|
||||
max = 1
|
||||
}
|
||||
|
||||
p := &WorkerPool{
|
||||
task: make(chan TaskHandler, 2*max),
|
||||
errChan: make(chan error, 1),
|
||||
waitingQueue: myqueue.New(),
|
||||
}
|
||||
|
||||
go p.loop(max)
|
||||
return p
|
||||
}
|
||||
|
||||
//SetTimeout 设置超时时间
|
||||
func (p *WorkerPool) SetTimeout(timeout time.Duration) {
|
||||
p.timeout = timeout
|
||||
}
|
||||
|
||||
//Add to the workpool and return immediately
|
||||
//Do 添加到工作池,并立即返回
|
||||
func (p *WorkerPool) Do(fn TaskHandler) {
|
||||
if p.IsClosed() { // 已关闭
|
||||
return
|
||||
}
|
||||
p.waitingQueue.Push(fn)
|
||||
//p.task <- fn
|
||||
}
|
||||
|
||||
//Add to the workpool and wait for execution to complete before returning
|
||||
//DoWait 添加到工作池,并等待执行完成之后再返回
|
||||
func (p *WorkerPool) DoWait(task TaskHandler) {
|
||||
if p.IsClosed() { // closed
|
||||
return
|
||||
}
|
||||
|
||||
doneChan := make(chan struct{})
|
||||
p.waitingQueue.Push(TaskHandler(func() error {
|
||||
defer close(doneChan)
|
||||
return task()
|
||||
}))
|
||||
<-doneChan
|
||||
}
|
||||
|
||||
//Waiting for the worker thread to finish executing
|
||||
//Wait 等待工作线程执行结束
|
||||
func (p *WorkerPool) Wait() error {
|
||||
p.waitingQueue.Wait() //等待队列结束
|
||||
close(p.task)
|
||||
p.wg.Wait() //等待结束
|
||||
select {
|
||||
case err := <-p.errChan:
|
||||
return err
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
//Determine whether it is complete (non-blocking)
|
||||
//IsDone 判断是否完成 (非阻塞)
|
||||
func (p *WorkerPool) IsDone() bool {
|
||||
if p == nil || p.task == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
return len(p.task) == 0
|
||||
}
|
||||
|
||||
//Has it been closed?
|
||||
//IsClosed 是否已经关闭
|
||||
func (p *WorkerPool) IsClosed() bool {
|
||||
if atomic.LoadInt32(&p.closed) == 1 { // closed
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (p *WorkerPool) startQueue() {
|
||||
for {
|
||||
fn := p.waitingQueue.Pop().(TaskHandler)
|
||||
if p.IsClosed() { // closed
|
||||
p.waitingQueue.Close()
|
||||
break
|
||||
}
|
||||
|
||||
if fn != nil {
|
||||
p.task <- fn
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *WorkerPool) loop(maxWorkersCount int) {
|
||||
go p.startQueue() //Startup queue , 启动队列
|
||||
|
||||
p.wg.Add(maxWorkersCount) // Maximum number of work cycles,最大的工作协程数
|
||||
//Start Max workers, 启动max个worker
|
||||
for i := 0; i < maxWorkersCount; i++ {
|
||||
go func() {
|
||||
defer p.wg.Done()
|
||||
// worker 开始干活
|
||||
for wt := range p.task {
|
||||
if wt == nil || atomic.LoadInt32(&p.closed) == 1 { //returns immediately,有err 立即返回
|
||||
continue //It needs to be consumed before returning.需要先消费完了之后再返回,
|
||||
}
|
||||
|
||||
closed := make(chan struct{}, 1)
|
||||
// Set timeout, priority task timeout.有设置超时,优先task 的超时
|
||||
if p.timeout > 0 {
|
||||
ct, cancel := context.WithTimeout(context.Background(), p.timeout)
|
||||
go func() {
|
||||
select {
|
||||
case <-ct.Done():
|
||||
p.errChan <- ct.Err()
|
||||
//if atomic.LoadInt32(&p.closed) != 1 {
|
||||
mylog.Error(ct.Err())
|
||||
atomic.StoreInt32(&p.closed, 1)
|
||||
cancel()
|
||||
case <-closed:
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
err := wt() //Points of Execution.真正执行的点
|
||||
close(closed)
|
||||
if err != nil {
|
||||
select {
|
||||
case p.errChan <- err:
|
||||
//if atomic.LoadInt32(&p.closed) != 1 {
|
||||
mylog.Error(err)
|
||||
atomic.StoreInt32(&p.closed, 1)
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
@@ -5,12 +5,13 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/xxjwxc/gowp/workpool"
|
||||
"github.com/xxjwxc/public/errors"
|
||||
)
|
||||
|
||||
//template
|
||||
func TestWorkerPoolStart(t *testing.T) {
|
||||
wp := New(10) //Set the maximum number of threads,设置最大线程数
|
||||
wp := workpool.New(10) //Set the maximum number of threads,设置最大线程数
|
||||
for i := 0; i < 20; i++ { //Open 20 requests 开启20个请求
|
||||
ii := i
|
||||
wp.Do(func() error {
|
||||
@@ -30,7 +31,7 @@ func TestWorkerPoolStart(t *testing.T) {
|
||||
//Support for error return
|
||||
//支持错误返回
|
||||
func TestWorkerPoolError(t *testing.T) {
|
||||
wp := New(10) //Set the maximum number of threads,设置最大线程数
|
||||
wp := workpool.New(10) //Set the maximum number of threads,设置最大线程数
|
||||
for i := 0; i < 20; i++ {
|
||||
ii := i
|
||||
wp.Do(func() error {
|
||||
@@ -58,7 +59,7 @@ func TestWorkerPoolError(t *testing.T) {
|
||||
//Determine whether completion (non-blocking) is placed in the workpool and wait for execution results
|
||||
//放到工作池里面 且等待执行结果
|
||||
func TestWorkerPoolDoWait(t *testing.T) {
|
||||
wp := New(5) //Set the maximum number of threads,设置最大线程数
|
||||
wp := workpool.New(5) //Set the maximum number of threads,设置最大线程数
|
||||
for i := 0; i < 10; i++ {
|
||||
ii := i
|
||||
wp.DoWait(func() error {
|
||||
@@ -86,7 +87,7 @@ func TestWorkerPoolDoWait(t *testing.T) {
|
||||
//Determine whether it is complete (non-blocking)
|
||||
//判断是否完成 (非阻塞)
|
||||
func TestWorkerPoolIsDone(t *testing.T) {
|
||||
wp := New(5) //Set the maximum number of threads,设置最大线程数
|
||||
wp := workpool.New(5) //Set the maximum number of threads,设置最大线程数
|
||||
for i := 0; i < 10; i++ {
|
||||
// ii := i
|
||||
wp.Do(func() error {
|
||||
|
Reference in New Issue
Block a user