mirror of
https://github.com/harshabose/buffer.git
synced 2025-09-27 03:35:59 +08:00
removed nil checks
This commit is contained in:
@@ -9,8 +9,8 @@ import (
|
|||||||
|
|
||||||
type ChannelBuffer[T any] struct {
|
type ChannelBuffer[T any] struct {
|
||||||
pool Pool[T]
|
pool Pool[T]
|
||||||
bufferChannel chan *T
|
bufferChannel chan T
|
||||||
inputBuffer chan *T
|
inputBuffer chan T
|
||||||
closed bool
|
closed bool
|
||||||
mux sync.RWMutex
|
mux sync.RWMutex
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
@@ -19,8 +19,8 @@ type ChannelBuffer[T any] struct {
|
|||||||
func CreateChannelBuffer[T any](ctx context.Context, size int, pool Pool[T]) *ChannelBuffer[T] {
|
func CreateChannelBuffer[T any](ctx context.Context, size int, pool Pool[T]) *ChannelBuffer[T] {
|
||||||
buffer := &ChannelBuffer[T]{
|
buffer := &ChannelBuffer[T]{
|
||||||
pool: pool,
|
pool: pool,
|
||||||
bufferChannel: make(chan *T, size),
|
bufferChannel: make(chan T, size),
|
||||||
inputBuffer: make(chan *T, size),
|
inputBuffer: make(chan T, size),
|
||||||
closed: false,
|
closed: false,
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
}
|
}
|
||||||
@@ -28,7 +28,7 @@ func CreateChannelBuffer[T any](ctx context.Context, size int, pool Pool[T]) *Ch
|
|||||||
return buffer
|
return buffer
|
||||||
}
|
}
|
||||||
|
|
||||||
func (buffer *ChannelBuffer[T]) Push(ctx context.Context, element *T) error {
|
func (buffer *ChannelBuffer[T]) Push(ctx context.Context, element T) error {
|
||||||
buffer.mux.RLock()
|
buffer.mux.RLock()
|
||||||
defer buffer.mux.RUnlock()
|
defer buffer.mux.RUnlock()
|
||||||
|
|
||||||
@@ -44,38 +44,40 @@ func (buffer *ChannelBuffer[T]) Push(ctx context.Context, element *T) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (buffer *ChannelBuffer[T]) Pop(ctx context.Context) (*T, error) {
|
func (buffer *ChannelBuffer[T]) Pop(ctx context.Context) (T, error) {
|
||||||
buffer.mux.RLock()
|
buffer.mux.RLock()
|
||||||
defer buffer.mux.RUnlock()
|
defer buffer.mux.RUnlock()
|
||||||
|
|
||||||
|
var zero T
|
||||||
|
|
||||||
if buffer.closed {
|
if buffer.closed {
|
||||||
return nil, errors.New("buffer closed")
|
return zero, errors.New("buffer closed")
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return nil, ctx.Err()
|
return zero, ctx.Err()
|
||||||
case data, ok := <-buffer.bufferChannel:
|
case data, ok := <-buffer.bufferChannel:
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, ErrorChannelBufferClose
|
return zero, ErrorChannelBufferClose
|
||||||
}
|
|
||||||
if data == nil {
|
|
||||||
return nil, ErrorElementUnallocated
|
|
||||||
}
|
}
|
||||||
|
// if data == nil {
|
||||||
|
// return zero, ErrorElementUnallocated
|
||||||
|
// }
|
||||||
return data, nil
|
return data, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (buffer *ChannelBuffer[T]) Generate() *T {
|
func (buffer *ChannelBuffer[T]) Generate() T {
|
||||||
return buffer.pool.Get()
|
return buffer.pool.Get()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (buffer *ChannelBuffer[T]) PutBack(element *T) {
|
func (buffer *ChannelBuffer[T]) PutBack(element T) {
|
||||||
if buffer.pool != nil {
|
if buffer.pool != nil {
|
||||||
buffer.pool.Put(element)
|
buffer.pool.Put(element)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (buffer *ChannelBuffer[T]) GetChannel() chan *T {
|
func (buffer *ChannelBuffer[T]) GetChannel() chan T {
|
||||||
return buffer.bufferChannel
|
return buffer.bufferChannel
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -91,7 +93,10 @@ loop:
|
|||||||
case <-buffer.ctx.Done():
|
case <-buffer.ctx.Done():
|
||||||
return
|
return
|
||||||
case element, ok := <-buffer.inputBuffer:
|
case element, ok := <-buffer.inputBuffer:
|
||||||
if !ok || element == nil {
|
// if !ok || element == nil {
|
||||||
|
// continue loop
|
||||||
|
// }
|
||||||
|
if !ok {
|
||||||
continue loop
|
continue loop
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
|
@@ -3,22 +3,22 @@ package buffer
|
|||||||
import "context"
|
import "context"
|
||||||
|
|
||||||
type Pool[T any] interface {
|
type Pool[T any] interface {
|
||||||
Get() *T
|
Get() T
|
||||||
Put(*T)
|
Put(T)
|
||||||
Release()
|
Release()
|
||||||
}
|
}
|
||||||
|
|
||||||
type Buffer[T any] interface {
|
type Buffer[T any] interface {
|
||||||
Push(context.Context, *T) error
|
Push(context.Context, T) error
|
||||||
Pop(ctx context.Context) (*T, error)
|
Pop(ctx context.Context) (T, error)
|
||||||
Size() int
|
Size() int
|
||||||
}
|
}
|
||||||
|
|
||||||
type BufferWithGenerator[T any] interface {
|
type BufferWithGenerator[T any] interface {
|
||||||
Push(context.Context, *T) error
|
Push(context.Context, T) error
|
||||||
Pop(ctx context.Context) (*T, error)
|
Pop(ctx context.Context) (T, error)
|
||||||
Size() int
|
Size() int
|
||||||
Generate() *T
|
Generate() T
|
||||||
PutBack(*T)
|
PutBack(T)
|
||||||
GetChannel() chan *T
|
GetChannel() chan T
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user