ringbuffer: consume buffer before closing

This commit is contained in:
aler9
2021-05-04 12:10:39 +02:00
committed by Alessandro Ros
parent 750583341a
commit 1a2dacadd8
2 changed files with 8 additions and 5 deletions

View File

@@ -54,13 +54,12 @@ func (r *RingBuffer) Push(data interface{}) {
// Pull pulls some data from the beginning of the buffer. // Pull pulls some data from the beginning of the buffer.
func (r *RingBuffer) Pull() (interface{}, bool) { func (r *RingBuffer) Pull() (interface{}, bool) {
for { for {
if atomic.SwapInt64(&r.closed, 0) == 1 {
return nil, false
}
i := r.readIndex % r.bufferSize i := r.readIndex % r.bufferSize
res := (*interface{})(atomic.SwapPointer(&r.buffer[i], nil)) res := (*interface{})(atomic.SwapPointer(&r.buffer[i], nil))
if res == nil { if res == nil {
if atomic.SwapInt64(&r.closed, 0) == 1 {
return nil, false
}
r.event.wait() r.event.wait()
continue continue
} }

View File

@@ -46,11 +46,15 @@ func TestClose(t *testing.T) {
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
defer close(done) defer close(done)
_, ok := r.Pull() _, ok := r.Pull()
require.Equal(t, true, ok)
_, ok = r.Pull()
require.Equal(t, false, ok) require.Equal(t, false, ok)
}() }()
time.Sleep(100 * time.Millisecond) r.Push([]byte{0x01, 0x02, 0x03, 0x04})
r.Close() r.Close()
<-done <-done