Files
gortsplib/pkg/ringbuffer/ringbuffer.go

68 lines
1.4 KiB
Go

package ringbuffer
import (
"sync/atomic"
"time"
"unsafe"
)
// RingBuffer is a ring buffer.
type RingBuffer struct {
bufferSize uint64
readIndex uint64
writeIndex uint64
closed int64
buffer []unsafe.Pointer
}
// New allocates a RingBuffer.
func New(size uint64) *RingBuffer {
return &RingBuffer{
bufferSize: size,
readIndex: 1,
writeIndex: 0,
buffer: make([]unsafe.Pointer, size),
}
}
// Close makes Pull() return false.
func (r *RingBuffer) Close() {
atomic.StoreInt64(&r.closed, 1)
}
// Reset restores Pull().
func (r *RingBuffer) Reset() {
for i := uint64(0); i < r.bufferSize; i++ {
atomic.SwapPointer(&r.buffer[i], nil)
}
atomic.SwapUint64(&r.writeIndex, 0)
r.readIndex = 1
atomic.StoreInt64(&r.closed, 0)
}
// Push pushes some data at the end of the buffer.
func (r *RingBuffer) Push(data interface{}) {
writeIndex := atomic.AddUint64(&r.writeIndex, 1)
i := writeIndex % r.bufferSize
atomic.SwapPointer(&r.buffer[i], unsafe.Pointer(&data))
}
// Pull pulls some data from the beginning of the buffer.
func (r *RingBuffer) Pull() (interface{}, bool) {
for {
if atomic.SwapInt64(&r.closed, 0) == 1 {
return nil, false
}
i := r.readIndex % r.bufferSize
res := (*interface{})(atomic.SwapPointer(&r.buffer[i], nil))
if res == nil {
time.Sleep(10 * time.Millisecond)
continue
}
r.readIndex++
return *res, true
}
}