diff --git a/pkg/ringbuffer/ringbuffer.go b/pkg/ringbuffer/ringbuffer.go index b505e410..999c6e39 100644 --- a/pkg/ringbuffer/ringbuffer.go +++ b/pkg/ringbuffer/ringbuffer.go @@ -54,13 +54,12 @@ func (r *RingBuffer) Push(data interface{}) { // Pull pulls some data from the beginning of the buffer. func (r *RingBuffer) Pull() (interface{}, bool) { for { - if atomic.SwapInt64(&r.closed, 0) == 1 { - return nil, false - } - i := r.readIndex % r.bufferSize res := (*interface{})(atomic.SwapPointer(&r.buffer[i], nil)) if res == nil { + if atomic.SwapInt64(&r.closed, 0) == 1 { + return nil, false + } r.event.wait() continue } diff --git a/pkg/ringbuffer/ringbuffer_test.go b/pkg/ringbuffer/ringbuffer_test.go index c86a0736..9736a02d 100644 --- a/pkg/ringbuffer/ringbuffer_test.go +++ b/pkg/ringbuffer/ringbuffer_test.go @@ -46,11 +46,15 @@ func TestClose(t *testing.T) { done := make(chan struct{}) go func() { defer close(done) + _, ok := r.Pull() + require.Equal(t, true, ok) + + _, ok = r.Pull() require.Equal(t, false, ok) }() - time.Sleep(100 * time.Millisecond) + r.Push([]byte{0x01, 0x02, 0x03, 0x04}) r.Close() <-done