diff --git a/pkg/ringbuffer/event.go b/pkg/ringbuffer/event.go deleted file mode 100644 index 0b2bea5f..00000000 --- a/pkg/ringbuffer/event.go +++ /dev/null @@ -1,38 +0,0 @@ -package ringbuffer - -import ( - "sync" -) - -type event struct { - mutex sync.Mutex - cond *sync.Cond - value bool -} - -func newEvent() *event { - cv := &event{} - cv.cond = sync.NewCond(&cv.mutex) - return cv -} - -func (cv *event) signal() { - func() { - cv.mutex.Lock() - defer cv.mutex.Unlock() - cv.value = true - }() - - cv.cond.Broadcast() -} - -func (cv *event) wait() { - cv.mutex.Lock() - defer cv.mutex.Unlock() - - if !cv.value { - cv.cond.Wait() - } - - cv.value = false -} diff --git a/pkg/ringbuffer/ringbuffer.go b/pkg/ringbuffer/ringbuffer.go index 8e9fb7ad..31f8c90e 100644 --- a/pkg/ringbuffer/ringbuffer.go +++ b/pkg/ringbuffer/ringbuffer.go @@ -3,18 +3,18 @@ package ringbuffer import ( "fmt" - "sync/atomic" - "unsafe" + "sync" ) // RingBuffer is a ring buffer. type RingBuffer struct { size uint64 + mutex sync.Mutex + cond *sync.Cond + buffer []interface{} readIndex uint64 writeIndex uint64 - closed int64 - buffer []unsafe.Pointer - event *event + closed bool } // New allocates a RingBuffer. @@ -25,53 +25,73 @@ func New(size uint64) (*RingBuffer, error) { return nil, fmt.Errorf("size must be a power of two") } - return &RingBuffer{ - size: size, - readIndex: 1, - writeIndex: 0, - buffer: make([]unsafe.Pointer, size), - event: newEvent(), - }, nil + r := &RingBuffer{ + size: size, + buffer: make([]interface{}, size), + } + + r.cond = sync.NewCond(&r.mutex) + + return r, nil } // Close makes Pull() return false. func (r *RingBuffer) Close() { - atomic.StoreInt64(&r.closed, 1) - r.event.signal() + r.mutex.Lock() + r.closed = true + r.mutex.Unlock() + r.cond.Broadcast() } // Reset restores Pull() behavior after a Close(). func (r *RingBuffer) Reset() { for i := uint64(0); i < r.size; i++ { - atomic.SwapPointer(&r.buffer[i], nil) + r.buffer[i] = nil } - atomic.SwapUint64(&r.writeIndex, 0) - r.readIndex = 1 - atomic.StoreInt64(&r.closed, 0) + r.writeIndex = 0 + r.readIndex = 0 + r.closed = false } // Push pushes data at the end of the buffer. -func (r *RingBuffer) Push(data interface{}) { - writeIndex := atomic.AddUint64(&r.writeIndex, 1) - i := writeIndex % r.size - atomic.SwapPointer(&r.buffer[i], unsafe.Pointer(&data)) - r.event.signal() +func (r *RingBuffer) Push(data interface{}) bool { + r.mutex.Lock() + + if r.buffer[r.writeIndex] != nil { + r.mutex.Unlock() + return false + } + + r.buffer[r.writeIndex] = data + r.writeIndex = (r.writeIndex + 1) % r.size + r.mutex.Unlock() + + r.cond.Broadcast() + + return true } // Pull pulls data from the beginning of the buffer. func (r *RingBuffer) Pull() (interface{}, bool) { for { - i := r.readIndex % r.size - res := (*interface{})(atomic.SwapPointer(&r.buffer[i], nil)) - if res == nil { - if atomic.SwapInt64(&r.closed, 0) == 1 { - return nil, false - } - r.event.wait() - continue + r.mutex.Lock() + + data := r.buffer[r.readIndex] + + if data != nil { + r.buffer[r.readIndex] = nil + r.readIndex = (r.readIndex + 1) % r.size + r.mutex.Unlock() + return data, true } - r.readIndex++ - return *res, true + if r.closed { + r.mutex.Unlock() + return nil, false + } + + r.cond.Wait() + + r.mutex.Unlock() } } diff --git a/pkg/ringbuffer/ringbuffer_test.go b/pkg/ringbuffer/ringbuffer_test.go index 25a9635a..1c20a5f7 100644 --- a/pkg/ringbuffer/ringbuffer_test.go +++ b/pkg/ringbuffer/ringbuffer_test.go @@ -18,12 +18,12 @@ func TestPushBeforePull(t *testing.T) { require.NoError(t, err) defer r.Close() - data := bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 1024/4) + ok := r.Push(bytes.Repeat([]byte{1, 2, 3, 4}, 1024/4)) + require.Equal(t, true, ok) - r.Push(data) ret, ok := r.Pull() require.Equal(t, true, ok) - require.Equal(t, data, ret) + require.Equal(t, bytes.Repeat([]byte{1, 2, 3, 4}, 1024/4), ret) } func TestPullBeforePush(t *testing.T) { @@ -31,19 +31,19 @@ func TestPullBeforePush(t *testing.T) { require.NoError(t, err) defer r.Close() - data := bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 1024/4) - done := make(chan struct{}) go func() { defer close(done) ret, ok := r.Pull() require.Equal(t, true, ok) - require.Equal(t, data, ret) + require.Equal(t, bytes.Repeat([]byte{1, 2, 3, 4}, 1024/4), ret) }() time.Sleep(100 * time.Millisecond) - r.Push(data) + ok := r.Push(bytes.Repeat([]byte{1, 2, 3, 4}, 1024/4)) + require.Equal(t, true, ok) + <-done } @@ -62,17 +62,37 @@ func TestClose(t *testing.T) { require.Equal(t, false, ok) }() - r.Push([]byte{0x01, 0x02, 0x03, 0x04}) + ok := r.Push([]byte{1, 2, 3, 4}) + require.Equal(t, true, ok) r.Close() <-done r.Reset() - r.Push([]byte{0x05, 0x06, 0x07, 0x08}) - - _, ok := r.Pull() + ok = r.Push([]byte{5, 6, 7, 8}) require.Equal(t, true, ok) + + _, ok = r.Pull() + require.Equal(t, true, ok) +} + +func TestOverflow(t *testing.T) { + r, err := New(32) + require.NoError(t, err) + + for i := 0; i < 32; i++ { + r.Push([]byte{1, 2, 3, 4}) + } + + ok := r.Push([]byte{5, 6, 7, 8}) + require.Equal(t, false, ok) + + for i := 0; i < 32; i++ { + data, ok := r.Pull() + require.Equal(t, true, ok) + require.Equal(t, []byte{1, 2, 3, 4}, data) + } } func BenchmarkPushPullContinuous(b *testing.B) { @@ -123,7 +143,7 @@ func BenchmarkPushPullPaused5(b *testing.B) { } func BenchmarkPushPullPaused10(b *testing.B) { - r, _ := New(1024 * 8) + r, _ := New(128) defer r.Close() data := make([]byte, 1024)