mirror of
https://github.com/xxjwxc/public.git
synced 2025-09-27 04:06:03 +08:00
169 lines
2.7 KiB
Go
169 lines
2.7 KiB
Go
package myqueue
|
||
|
||
import (
|
||
"runtime"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
|
||
"gopkg.in/eapache/queue.v1"
|
||
)
|
||
|
||
//MyQueue queue
|
||
type MyQueue struct {
|
||
sync.Mutex
|
||
popable *sync.Cond
|
||
buffer *queue.Queue
|
||
closed bool
|
||
count int32
|
||
cc chan interface{}
|
||
once sync.Once
|
||
}
|
||
|
||
//New 创建
|
||
func New() *MyQueue {
|
||
ch := &MyQueue{
|
||
buffer: queue.New(),
|
||
}
|
||
ch.popable = sync.NewCond(&ch.Mutex)
|
||
return ch
|
||
}
|
||
|
||
//Pop 取出队列,(阻塞模式)
|
||
func (q *MyQueue) Pop() (v interface{}) {
|
||
c := q.popable
|
||
|
||
q.Mutex.Lock()
|
||
defer q.Mutex.Unlock()
|
||
|
||
for q.Len() == 0 && !q.closed {
|
||
c.Wait()
|
||
}
|
||
|
||
if q.closed { //已关闭
|
||
return
|
||
}
|
||
|
||
if q.Len() > 0 {
|
||
buffer := q.buffer
|
||
v = buffer.Peek()
|
||
buffer.Remove()
|
||
atomic.AddInt32(&q.count, -1)
|
||
}
|
||
return
|
||
}
|
||
|
||
// TryPop 试着取出队列(非阻塞模式)返回ok == false 表示空
|
||
func (q *MyQueue) TryPop() (v interface{}, ok bool) {
|
||
buffer := q.buffer
|
||
|
||
q.Mutex.Lock()
|
||
defer q.Mutex.Unlock()
|
||
|
||
if q.Len() > 0 {
|
||
v = buffer.Peek()
|
||
buffer.Remove()
|
||
atomic.AddInt32(&q.count, -1)
|
||
ok = true
|
||
} else if q.closed {
|
||
ok = true
|
||
}
|
||
|
||
return
|
||
}
|
||
|
||
// TryPopTimeout 试着取出队列(塞模式+timeout)返回ok == false 表示超时
|
||
func (q *MyQueue) TryPopTimeout(tm time.Duration) (v interface{}, ok bool) {
|
||
q.once.Do(func() {
|
||
q.cc = make(chan interface{}, 1)
|
||
})
|
||
go func() {
|
||
q.popChan(&q.cc)
|
||
}()
|
||
|
||
ok = true
|
||
timeout := time.After(tm)
|
||
select {
|
||
case v = <-q.cc:
|
||
case <-timeout:
|
||
if !q.closed {
|
||
q.popable.Signal()
|
||
}
|
||
ok = false
|
||
}
|
||
|
||
return
|
||
}
|
||
|
||
//Pop 取出队列,(阻塞模式)
|
||
func (q *MyQueue) popChan(v *chan interface{}) {
|
||
c := q.popable
|
||
|
||
q.Mutex.Lock()
|
||
defer q.Mutex.Unlock()
|
||
|
||
for q.Len() == 0 && !q.closed {
|
||
c.Wait()
|
||
}
|
||
|
||
if q.closed { //已关闭
|
||
*v <- nil
|
||
return
|
||
}
|
||
|
||
if q.Len() > 0 {
|
||
buffer := q.buffer
|
||
tmp := buffer.Peek()
|
||
buffer.Remove()
|
||
atomic.AddInt32(&q.count, -1)
|
||
*v <- tmp
|
||
} else {
|
||
*v <- nil
|
||
}
|
||
return
|
||
}
|
||
|
||
// Push 插入队列,非阻塞
|
||
func (q *MyQueue) Push(v interface{}) {
|
||
q.Mutex.Lock()
|
||
defer q.Mutex.Unlock()
|
||
if !q.closed {
|
||
q.buffer.Add(v)
|
||
atomic.AddInt32(&q.count, 1)
|
||
q.popable.Signal()
|
||
}
|
||
}
|
||
|
||
// Len 获取队列长度
|
||
func (q *MyQueue) Len() int {
|
||
return (int)(atomic.LoadInt32(&q.count))
|
||
}
|
||
|
||
// Close MyQueue
|
||
// After close, Pop will return nil without block, and TryPop will return v=nil, ok=True
|
||
func (q *MyQueue) Close() {
|
||
q.Mutex.Lock()
|
||
defer q.Mutex.Unlock()
|
||
if !q.closed {
|
||
q.closed = true
|
||
atomic.StoreInt32(&q.count, 0)
|
||
q.popable.Broadcast() //广播
|
||
}
|
||
}
|
||
|
||
// IsClose check is closed
|
||
func (q *MyQueue) IsClose() bool {
|
||
return q.closed
|
||
}
|
||
|
||
//Wait 等待队列消费完成
|
||
func (q *MyQueue) Wait() {
|
||
for {
|
||
if q.closed || q.Len() == 0 {
|
||
break
|
||
}
|
||
|
||
runtime.Gosched() //出让时间片
|
||
}
|
||
}
|