ringbuffer: when buffer is full, preserve queued data (#386)

This commit is contained in:
Alessandro Ros
2023-08-26 17:14:04 +02:00
committed by GitHub
parent e87c6d66e5
commit bf3de06b3c
3 changed files with 85 additions and 83 deletions

View File

@@ -1,38 +0,0 @@
package ringbuffer
import (
"sync"
)
type event struct {
mutex sync.Mutex
cond *sync.Cond
value bool
}
func newEvent() *event {
cv := &event{}
cv.cond = sync.NewCond(&cv.mutex)
return cv
}
func (cv *event) signal() {
func() {
cv.mutex.Lock()
defer cv.mutex.Unlock()
cv.value = true
}()
cv.cond.Broadcast()
}
func (cv *event) wait() {
cv.mutex.Lock()
defer cv.mutex.Unlock()
if !cv.value {
cv.cond.Wait()
}
cv.value = false
}

View File

@@ -3,18 +3,18 @@ package ringbuffer
import ( import (
"fmt" "fmt"
"sync/atomic" "sync"
"unsafe"
) )
// RingBuffer is a ring buffer. // RingBuffer is a ring buffer.
type RingBuffer struct { type RingBuffer struct {
size uint64 size uint64
mutex sync.Mutex
cond *sync.Cond
buffer []interface{}
readIndex uint64 readIndex uint64
writeIndex uint64 writeIndex uint64
closed int64 closed bool
buffer []unsafe.Pointer
event *event
} }
// New allocates a RingBuffer. // New allocates a RingBuffer.
@@ -25,53 +25,73 @@ func New(size uint64) (*RingBuffer, error) {
return nil, fmt.Errorf("size must be a power of two") return nil, fmt.Errorf("size must be a power of two")
} }
return &RingBuffer{ r := &RingBuffer{
size: size, size: size,
readIndex: 1, buffer: make([]interface{}, size),
writeIndex: 0, }
buffer: make([]unsafe.Pointer, size),
event: newEvent(), r.cond = sync.NewCond(&r.mutex)
}, nil
return r, nil
} }
// Close makes Pull() return false. // Close makes Pull() return false.
func (r *RingBuffer) Close() { func (r *RingBuffer) Close() {
atomic.StoreInt64(&r.closed, 1) r.mutex.Lock()
r.event.signal() r.closed = true
r.mutex.Unlock()
r.cond.Broadcast()
} }
// Reset restores Pull() behavior after a Close(). // Reset restores Pull() behavior after a Close().
func (r *RingBuffer) Reset() { func (r *RingBuffer) Reset() {
for i := uint64(0); i < r.size; i++ { for i := uint64(0); i < r.size; i++ {
atomic.SwapPointer(&r.buffer[i], nil) r.buffer[i] = nil
} }
atomic.SwapUint64(&r.writeIndex, 0) r.writeIndex = 0
r.readIndex = 1 r.readIndex = 0
atomic.StoreInt64(&r.closed, 0) r.closed = false
} }
// Push pushes data at the end of the buffer. // Push pushes data at the end of the buffer.
func (r *RingBuffer) Push(data interface{}) { func (r *RingBuffer) Push(data interface{}) bool {
writeIndex := atomic.AddUint64(&r.writeIndex, 1) r.mutex.Lock()
i := writeIndex % r.size
atomic.SwapPointer(&r.buffer[i], unsafe.Pointer(&data)) if r.buffer[r.writeIndex] != nil {
r.event.signal() r.mutex.Unlock()
return false
}
r.buffer[r.writeIndex] = data
r.writeIndex = (r.writeIndex + 1) % r.size
r.mutex.Unlock()
r.cond.Broadcast()
return true
} }
// Pull pulls data from the beginning of the buffer. // Pull pulls data from the beginning of the buffer.
func (r *RingBuffer) Pull() (interface{}, bool) { func (r *RingBuffer) Pull() (interface{}, bool) {
for { for {
i := r.readIndex % r.size r.mutex.Lock()
res := (*interface{})(atomic.SwapPointer(&r.buffer[i], nil))
if res == nil { data := r.buffer[r.readIndex]
if atomic.SwapInt64(&r.closed, 0) == 1 {
return nil, false if data != nil {
} r.buffer[r.readIndex] = nil
r.event.wait() r.readIndex = (r.readIndex + 1) % r.size
continue r.mutex.Unlock()
return data, true
} }
r.readIndex++ if r.closed {
return *res, true r.mutex.Unlock()
return nil, false
}
r.cond.Wait()
r.mutex.Unlock()
} }
} }

View File

@@ -18,12 +18,12 @@ func TestPushBeforePull(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer r.Close() defer r.Close()
data := bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 1024/4) ok := r.Push(bytes.Repeat([]byte{1, 2, 3, 4}, 1024/4))
require.Equal(t, true, ok)
r.Push(data)
ret, ok := r.Pull() ret, ok := r.Pull()
require.Equal(t, true, ok) require.Equal(t, true, ok)
require.Equal(t, data, ret) require.Equal(t, bytes.Repeat([]byte{1, 2, 3, 4}, 1024/4), ret)
} }
func TestPullBeforePush(t *testing.T) { func TestPullBeforePush(t *testing.T) {
@@ -31,19 +31,19 @@ func TestPullBeforePush(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer r.Close() defer r.Close()
data := bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 1024/4)
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
defer close(done) defer close(done)
ret, ok := r.Pull() ret, ok := r.Pull()
require.Equal(t, true, ok) require.Equal(t, true, ok)
require.Equal(t, data, ret) require.Equal(t, bytes.Repeat([]byte{1, 2, 3, 4}, 1024/4), ret)
}() }()
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
r.Push(data) ok := r.Push(bytes.Repeat([]byte{1, 2, 3, 4}, 1024/4))
require.Equal(t, true, ok)
<-done <-done
} }
@@ -62,17 +62,37 @@ func TestClose(t *testing.T) {
require.Equal(t, false, ok) require.Equal(t, false, ok)
}() }()
r.Push([]byte{0x01, 0x02, 0x03, 0x04}) ok := r.Push([]byte{1, 2, 3, 4})
require.Equal(t, true, ok)
r.Close() r.Close()
<-done <-done
r.Reset() r.Reset()
r.Push([]byte{0x05, 0x06, 0x07, 0x08}) ok = r.Push([]byte{5, 6, 7, 8})
_, ok := r.Pull()
require.Equal(t, true, ok) require.Equal(t, true, ok)
_, ok = r.Pull()
require.Equal(t, true, ok)
}
func TestOverflow(t *testing.T) {
r, err := New(32)
require.NoError(t, err)
for i := 0; i < 32; i++ {
r.Push([]byte{1, 2, 3, 4})
}
ok := r.Push([]byte{5, 6, 7, 8})
require.Equal(t, false, ok)
for i := 0; i < 32; i++ {
data, ok := r.Pull()
require.Equal(t, true, ok)
require.Equal(t, []byte{1, 2, 3, 4}, data)
}
} }
func BenchmarkPushPullContinuous(b *testing.B) { func BenchmarkPushPullContinuous(b *testing.B) {
@@ -123,7 +143,7 @@ func BenchmarkPushPullPaused5(b *testing.B) {
} }
func BenchmarkPushPullPaused10(b *testing.B) { func BenchmarkPushPullPaused10(b *testing.B) {
r, _ := New(1024 * 8) r, _ := New(128)
defer r.Close() defer r.Close()
data := make([]byte, 1024) data := make([]byte, 1024)