add pop timeout

This commit is contained in:
xxj
2020-12-18 13:14:02 +08:00
parent bec3d3ae50
commit 956424c3ec
2 changed files with 81 additions and 2 deletions

View File

@@ -4,6 +4,7 @@ import (
"runtime"
"sync"
"sync/atomic"
"time"
"gopkg.in/eapache/queue.v1"
)
@@ -15,6 +16,8 @@ type MyQueue struct {
buffer *queue.Queue
closed bool
count int32
cc chan interface{}
once sync.Once
}
//New 创建
@@ -29,7 +32,6 @@ func New() *MyQueue {
//Pop 取出队列,(阻塞模式)
func (q *MyQueue) Pop() (v interface{}) {
c := q.popable
buffer := q.buffer
q.Mutex.Lock()
defer q.Mutex.Unlock()
@@ -43,6 +45,7 @@ func (q *MyQueue) Pop() (v interface{}) {
}
if q.Len() > 0 {
buffer := q.buffer
v = buffer.Peek()
buffer.Remove()
atomic.AddInt32(&q.count, -1)
@@ -50,7 +53,7 @@ func (q *MyQueue) Pop() (v interface{}) {
return
}
//试着取出队列非阻塞模式返回ok == false 表示空
// TryPop 试着取出队列非阻塞模式返回ok == false 表示空
func (q *MyQueue) TryPop() (v interface{}, ok bool) {
buffer := q.buffer
@@ -69,6 +72,57 @@ func (q *MyQueue) TryPop() (v interface{}, ok bool) {
return
}
// TryPopTimeout 试着取出队列(塞模式+timeout返回ok == false 表示超时
func (q *MyQueue) TryPopTimeout(tm time.Duration) (v interface{}, ok bool) {
q.once.Do(func() {
q.cc = make(chan interface{}, 1)
})
go func() {
q.popChan(&q.cc)
}()
ok = true
timeout := time.After(tm)
select {
case v = <-q.cc:
case <-timeout:
if !q.closed {
q.popable.Signal()
}
ok = false
}
return
}
//Pop 取出队列,(阻塞模式)
func (q *MyQueue) popChan(v *chan interface{}) {
c := q.popable
q.Mutex.Lock()
defer q.Mutex.Unlock()
for q.Len() == 0 && !q.closed {
c.Wait()
}
if q.closed { //已关闭
*v <- nil
return
}
if q.Len() > 0 {
buffer := q.buffer
tmp := buffer.Peek()
buffer.Remove()
atomic.AddInt32(&q.count, -1)
*v <- tmp
} else {
*v <- nil
}
return
}
// 插入队列,非阻塞
func (q *MyQueue) Push(v interface{}) {
q.Mutex.Lock()

View File

@@ -103,3 +103,28 @@ func TestTry(t *testing.T) {
que.Wait()
fmt.Println("down")
}
func TestTimeout(t *testing.T) {
que := New()
go func() {
for i := 0; i < 10; i++ { //开启20个请求
time.Sleep(1 * time.Second)
que.Push(i)
}
}()
go func() {
for {
b, ok := que.TryPopTimeout(100 * time.Microsecond)
if ok {
fmt.Println(b.(int))
} else {
fmt.Println("time out")
}
}
}()
time.Sleep(200 * time.Second)
que.Wait()
fmt.Println("down")
}