ringbuffer: eliminate idle load by using condition variables instead of sleeps

This commit is contained in:
aler9
2021-03-05 20:14:57 +01:00
parent 5f15e8e3b6
commit 0b43bd2f19
4 changed files with 61 additions and 5 deletions

38
pkg/ringbuffer/event.go Normal file
View File

@@ -0,0 +1,38 @@
package ringbuffer
import (
"sync"
)
type event struct {
mutex sync.Mutex
cond *sync.Cond
value bool
}
func newEvent() *event {
cv := &event{}
cv.cond = sync.NewCond(&cv.mutex)
return cv
}
func (cv *event) signal() {
func() {
cv.mutex.Lock()
defer cv.mutex.Unlock()
cv.value = true
}()
cv.cond.Broadcast()
}
func (cv *event) wait() {
cv.mutex.Lock()
defer cv.mutex.Unlock()
if !cv.value {
cv.cond.Wait()
}
cv.value = false
}

View File

@@ -2,7 +2,6 @@ package ringbuffer
import ( import (
"sync/atomic" "sync/atomic"
"time"
"unsafe" "unsafe"
) )
@@ -13,6 +12,7 @@ type RingBuffer struct {
writeIndex uint64 writeIndex uint64
closed int64 closed int64
buffer []unsafe.Pointer buffer []unsafe.Pointer
event *event
} }
// New allocates a RingBuffer. // New allocates a RingBuffer.
@@ -22,12 +22,14 @@ func New(size uint64) *RingBuffer {
readIndex: 1, readIndex: 1,
writeIndex: 0, writeIndex: 0,
buffer: make([]unsafe.Pointer, size), buffer: make([]unsafe.Pointer, size),
event: newEvent(),
} }
} }
// Close makes Pull() return false. // Close makes Pull() return false.
func (r *RingBuffer) Close() { func (r *RingBuffer) Close() {
atomic.StoreInt64(&r.closed, 1) atomic.StoreInt64(&r.closed, 1)
r.event.signal()
} }
// Reset restores Pull(). // Reset restores Pull().
@@ -45,6 +47,7 @@ func (r *RingBuffer) Push(data interface{}) {
writeIndex := atomic.AddUint64(&r.writeIndex, 1) writeIndex := atomic.AddUint64(&r.writeIndex, 1)
i := writeIndex % r.bufferSize i := writeIndex % r.bufferSize
atomic.SwapPointer(&r.buffer[i], unsafe.Pointer(&data)) atomic.SwapPointer(&r.buffer[i], unsafe.Pointer(&data))
r.event.signal()
} }
// Pull pulls some data from the beginning of the buffer. // Pull pulls some data from the beginning of the buffer.
@@ -57,7 +60,7 @@ func (r *RingBuffer) Pull() (interface{}, bool) {
i := r.readIndex % r.bufferSize i := r.readIndex % r.bufferSize
res := (*interface{})(atomic.SwapPointer(&r.buffer[i], nil)) res := (*interface{})(atomic.SwapPointer(&r.buffer[i], nil))
if res == nil { if res == nil {
time.Sleep(10 * time.Millisecond) r.event.wait()
continue continue
} }

View File

@@ -42,6 +42,22 @@ func TestPullBeforePush(t *testing.T) {
<-done <-done
} }
func TestClose(t *testing.T) {
r := New(1024)
done := make(chan struct{})
go func() {
defer close(done)
_, ok := r.Pull()
require.Equal(t, false, ok)
}()
time.Sleep(100 * time.Millisecond)
r.Close()
<-done
}
func BenchmarkPushPullContinuous(b *testing.B) { func BenchmarkPushPullContinuous(b *testing.B) {
r := New(1024 * 8) r := New(1024 * 8)
defer r.Close() defer r.Close()

View File

@@ -3,9 +3,8 @@ FROM amd64/golang:1.15-alpine3.12 AS server
RUN apk --no-cache add git RUN apk --no-cache add git
RUN git clone https://github.com/aler9/rtsp-simple-server RUN git clone -b v0.14.2 https://github.com/aler9/rtsp-simple-server \
&& cd rtsp-simple-server \
RUN cd rtsp-simple-server \
&& CGO_ENABLED=0 \ && CGO_ENABLED=0 \
go build -o /rtsp-simple-server . go build -o /rtsp-simple-server .