diff --git a/pkg/ringbuffer/event.go b/pkg/ringbuffer/event.go new file mode 100644 index 00000000..0b2bea5f --- /dev/null +++ b/pkg/ringbuffer/event.go @@ -0,0 +1,38 @@ +package ringbuffer + +import ( + "sync" +) + +type event struct { + mutex sync.Mutex + cond *sync.Cond + value bool +} + +func newEvent() *event { + cv := &event{} + cv.cond = sync.NewCond(&cv.mutex) + return cv +} + +func (cv *event) signal() { + func() { + cv.mutex.Lock() + defer cv.mutex.Unlock() + cv.value = true + }() + + cv.cond.Broadcast() +} + +func (cv *event) wait() { + cv.mutex.Lock() + defer cv.mutex.Unlock() + + if !cv.value { + cv.cond.Wait() + } + + cv.value = false +} diff --git a/pkg/ringbuffer/ringbuffer.go b/pkg/ringbuffer/ringbuffer.go index e000cc3d..6e68510c 100644 --- a/pkg/ringbuffer/ringbuffer.go +++ b/pkg/ringbuffer/ringbuffer.go @@ -2,7 +2,6 @@ package ringbuffer import ( "sync/atomic" - "time" "unsafe" ) @@ -13,6 +12,7 @@ type RingBuffer struct { writeIndex uint64 closed int64 buffer []unsafe.Pointer + event *event } // New allocates a RingBuffer. @@ -22,12 +22,14 @@ func New(size uint64) *RingBuffer { readIndex: 1, writeIndex: 0, buffer: make([]unsafe.Pointer, size), + event: newEvent(), } } // Close makes Pull() return false. func (r *RingBuffer) Close() { atomic.StoreInt64(&r.closed, 1) + r.event.signal() } // Reset restores Pull(). @@ -45,6 +47,7 @@ func (r *RingBuffer) Push(data interface{}) { writeIndex := atomic.AddUint64(&r.writeIndex, 1) i := writeIndex % r.bufferSize atomic.SwapPointer(&r.buffer[i], unsafe.Pointer(&data)) + r.event.signal() } // Pull pulls some data from the beginning of the buffer. @@ -57,7 +60,7 @@ func (r *RingBuffer) Pull() (interface{}, bool) { i := r.readIndex % r.bufferSize res := (*interface{})(atomic.SwapPointer(&r.buffer[i], nil)) if res == nil { - time.Sleep(10 * time.Millisecond) + r.event.wait() continue } diff --git a/pkg/ringbuffer/ringbuffer_test.go b/pkg/ringbuffer/ringbuffer_test.go index 9259a325..e692d200 100644 --- a/pkg/ringbuffer/ringbuffer_test.go +++ b/pkg/ringbuffer/ringbuffer_test.go @@ -42,6 +42,22 @@ func TestPullBeforePush(t *testing.T) { <-done } +func TestClose(t *testing.T) { + r := New(1024) + + done := make(chan struct{}) + go func() { + defer close(done) + _, ok := r.Pull() + require.Equal(t, false, ok) + }() + + time.Sleep(100 * time.Millisecond) + + r.Close() + <-done +} + func BenchmarkPushPullContinuous(b *testing.B) { r := New(1024 * 8) defer r.Close() diff --git a/testimages/rtsp-simple-server/Dockerfile b/testimages/rtsp-simple-server/Dockerfile index 7875157e..98a6c2c0 100644 --- a/testimages/rtsp-simple-server/Dockerfile +++ b/testimages/rtsp-simple-server/Dockerfile @@ -3,9 +3,8 @@ FROM amd64/golang:1.15-alpine3.12 AS server RUN apk --no-cache add git -RUN git clone https://github.com/aler9/rtsp-simple-server - -RUN cd rtsp-simple-server \ +RUN git clone -b v0.14.2 https://github.com/aler9/rtsp-simple-server \ + && cd rtsp-simple-server \ && CGO_ENABLED=0 \ go build -o /rtsp-simple-server .