From 4001846f995768a7a3813ef139943650bb7059ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B0=A2=E5=B0=8F=E5=86=9B?= <346944475@qq.com> Date: Wed, 3 Jun 2020 22:11:44 +0800 Subject: [PATCH] new --- go.mod | 1 + go.sum | 3 + workerpool/def.go | 28 ------- workerpool/workerpool.go | 152 ---------------------------------- workerpool/workerpool_test.go | 9 +- 5 files changed, 9 insertions(+), 184 deletions(-) delete mode 100644 workerpool/def.go delete mode 100644 workerpool/workerpool.go diff --git a/go.mod b/go.mod index 765e15e..e73c45e 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/spf13/cobra v1.0.0 github.com/syndtr/goleveldb v1.0.0 + github.com/xxjwxc/gowp v0.0.0-20200603130651-4d7368b0e285 go.uber.org/zap v1.10.0 golang.org/x/crypto v0.0.0-20191205180655-e7c4368fe9dd golang.org/x/image v0.0.0-20200430140353-33d19683fad8 // indirect diff --git a/go.sum b/go.sum index 51ab825..4e1da0b 100644 --- a/go.sum +++ b/go.sum @@ -175,7 +175,10 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1 github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= +github.com/xxjwxc/gowp v0.0.0-20200603130651-4d7368b0e285 h1:gbdax2ZvHZwe8zxu7by/HMuDUS47iHR2zmEzlgAHBMw= +github.com/xxjwxc/gowp v0.0.0-20200603130651-4d7368b0e285/go.mod h1:yJ/fY5BorWARfDDsxBU/MyQTHc5MVyNcqBQQYD6MN0k= github.com/xxjwxc/oauth2 v0.0.0-20200316140438-ddd4e3c5f272 h1:aKA2UeM+nzEanWpzZWqkD3e12FlX6bRdhcOsNhm21X0= +github.com/xxjwxc/public v0.0.0-20200603115833-341beff27850/go.mod h1:fp3M+FEQrCgWD1fZ/PLwZkCTglf086OEhC9LcydAUnc= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= diff --git a/workerpool/def.go b/workerpool/def.go deleted file mode 100644 index 33cc2b0..0000000 --- a/workerpool/def.go +++ /dev/null @@ -1,28 +0,0 @@ -package workerpool - -import ( - "sync" - "time" - - "github.com/xxjwxc/public/myqueue" -) - -// TaskHandler process .定义函数回调体 -type TaskHandler func() error - -// workerPool serves incoming connections via a pool of workers -// in FILO order, i.e. the most recently stopped worker will serve the next -// incoming connection. -// -// Such a scheme keeps CPU caches hot (in theory). -type WorkerPool struct { - //sync.Mutex - //maxWorkersCount int //最大的工作协程数 - //start sync.Once - closed int32 - errChan chan error //错误chan - timeout time.Duration //最大超时时间 - wg sync.WaitGroup - task chan TaskHandler - waitingQueue *myqueue.MyQueue -} diff --git a/workerpool/workerpool.go b/workerpool/workerpool.go deleted file mode 100644 index fd2aea3..0000000 --- a/workerpool/workerpool.go +++ /dev/null @@ -1,152 +0,0 @@ -package workerpool - -import ( - "context" - "sync/atomic" - "time" - - "github.com/xxjwxc/public/myqueue" - - "github.com/xxjwxc/public/mylog" -) - -//New 注册工作池,并设置最大并发数 -//new workpool and set the max number of concurrencies -func New(max int) *WorkerPool { - if max < 1 { - max = 1 - } - - p := &WorkerPool{ - task: make(chan TaskHandler, 2*max), - errChan: make(chan error, 1), - waitingQueue: myqueue.New(), - } - - go p.loop(max) - return p -} - -//SetTimeout 设置超时时间 -func (p *WorkerPool) SetTimeout(timeout time.Duration) { - p.timeout = timeout -} - -//Add to the workpool and return immediately -//Do 添加到工作池,并立即返回 -func (p *WorkerPool) Do(fn TaskHandler) { - if p.IsClosed() { // 已关闭 - return - } - p.waitingQueue.Push(fn) - //p.task <- fn -} - -//Add to the workpool and wait for execution to complete before returning -//DoWait 添加到工作池,并等待执行完成之后再返回 -func (p *WorkerPool) DoWait(task TaskHandler) { - if p.IsClosed() { // closed - return - } - - doneChan := make(chan struct{}) - p.waitingQueue.Push(TaskHandler(func() error { - defer close(doneChan) - return task() - })) - <-doneChan -} - -//Waiting for the worker thread to finish executing -//Wait 等待工作线程执行结束 -func (p *WorkerPool) Wait() error { - p.waitingQueue.Wait() //等待队列结束 - close(p.task) - p.wg.Wait() //等待结束 - select { - case err := <-p.errChan: - return err - default: - return nil - } -} - -//Determine whether it is complete (non-blocking) -//IsDone 判断是否完成 (非阻塞) -func (p *WorkerPool) IsDone() bool { - if p == nil || p.task == nil { - return true - } - - return len(p.task) == 0 -} - -//Has it been closed? -//IsClosed 是否已经关闭 -func (p *WorkerPool) IsClosed() bool { - if atomic.LoadInt32(&p.closed) == 1 { // closed - return true - } - return false -} - -func (p *WorkerPool) startQueue() { - for { - fn := p.waitingQueue.Pop().(TaskHandler) - if p.IsClosed() { // closed - p.waitingQueue.Close() - break - } - - if fn != nil { - p.task <- fn - } - } -} - -func (p *WorkerPool) loop(maxWorkersCount int) { - go p.startQueue() //Startup queue , 启动队列 - - p.wg.Add(maxWorkersCount) // Maximum number of work cycles,最大的工作协程数 - //Start Max workers, 启动max个worker - for i := 0; i < maxWorkersCount; i++ { - go func() { - defer p.wg.Done() - // worker 开始干活 - for wt := range p.task { - if wt == nil || atomic.LoadInt32(&p.closed) == 1 { //returns immediately,有err 立即返回 - continue //It needs to be consumed before returning.需要先消费完了之后再返回, - } - - closed := make(chan struct{}, 1) - // Set timeout, priority task timeout.有设置超时,优先task 的超时 - if p.timeout > 0 { - ct, cancel := context.WithTimeout(context.Background(), p.timeout) - go func() { - select { - case <-ct.Done(): - p.errChan <- ct.Err() - //if atomic.LoadInt32(&p.closed) != 1 { - mylog.Error(ct.Err()) - atomic.StoreInt32(&p.closed, 1) - cancel() - case <-closed: - } - }() - } - - err := wt() //Points of Execution.真正执行的点 - close(closed) - if err != nil { - select { - case p.errChan <- err: - //if atomic.LoadInt32(&p.closed) != 1 { - mylog.Error(err) - atomic.StoreInt32(&p.closed, 1) - default: - } - } - } - }() - } -} diff --git a/workerpool/workerpool_test.go b/workerpool/workerpool_test.go index 947a25a..b7e9238 100644 --- a/workerpool/workerpool_test.go +++ b/workerpool/workerpool_test.go @@ -5,12 +5,13 @@ import ( "testing" "time" + "github.com/xxjwxc/gowp/workpool" "github.com/xxjwxc/public/errors" ) //template func TestWorkerPoolStart(t *testing.T) { - wp := New(10) //Set the maximum number of threads,设置最大线程数 + wp := workpool.New(10) //Set the maximum number of threads,设置最大线程数 for i := 0; i < 20; i++ { //Open 20 requests 开启20个请求 ii := i wp.Do(func() error { @@ -30,7 +31,7 @@ func TestWorkerPoolStart(t *testing.T) { //Support for error return //支持错误返回 func TestWorkerPoolError(t *testing.T) { - wp := New(10) //Set the maximum number of threads,设置最大线程数 + wp := workpool.New(10) //Set the maximum number of threads,设置最大线程数 for i := 0; i < 20; i++ { ii := i wp.Do(func() error { @@ -58,7 +59,7 @@ func TestWorkerPoolError(t *testing.T) { //Determine whether completion (non-blocking) is placed in the workpool and wait for execution results //放到工作池里面 且等待执行结果 func TestWorkerPoolDoWait(t *testing.T) { - wp := New(5) //Set the maximum number of threads,设置最大线程数 + wp := workpool.New(5) //Set the maximum number of threads,设置最大线程数 for i := 0; i < 10; i++ { ii := i wp.DoWait(func() error { @@ -86,7 +87,7 @@ func TestWorkerPoolDoWait(t *testing.T) { //Determine whether it is complete (non-blocking) //判断是否完成 (非阻塞) func TestWorkerPoolIsDone(t *testing.T) { - wp := New(5) //Set the maximum number of threads,设置最大线程数 + wp := workpool.New(5) //Set the maximum number of threads,设置最大线程数 for i := 0; i < 10; i++ { // ii := i wp.Do(func() error {