commit 34413c88ecaf3a7ff2a4155f1154a58c7e7b0149 Author: harshabose Date: Mon Feb 24 17:40:44 2025 +0530 first commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..bffbb23 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/third-party +.idea \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..2c0b0d1 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/harshabose/tools/buffer + +go 1.23.3 \ No newline at end of file diff --git a/pkg/errors.go b/pkg/errors.go new file mode 100644 index 0000000..74507d5 --- /dev/null +++ b/pkg/errors.go @@ -0,0 +1,8 @@ +package buffer + +import "errors" + +var ( + ErrorElementUnallocated = errors.New("encountered nil in the buffer. this should not happen. check usage") + ErrorChannelBufferClose = errors.New("channel buffer has be closed. cannot perform this operation") +) diff --git a/pkg/limit_buffer.go b/pkg/limit_buffer.go new file mode 100644 index 0000000..8673cf5 --- /dev/null +++ b/pkg/limit_buffer.go @@ -0,0 +1,117 @@ +package buffer + +import ( + "context" + "fmt" +) + +type ChannelBuffer[T any] struct { + pool Pool[T] + bufferChannel chan *T + inputBuffer chan *T + ctx context.Context +} + +func CreateChannelBuffer[T any](ctx context.Context, size int, pool Pool[T]) *ChannelBuffer[T] { + buffer := &ChannelBuffer[T]{ + pool: pool, + bufferChannel: make(chan *T, size), + inputBuffer: make(chan *T), + ctx: ctx, + } + go buffer.loop() + return buffer +} + +func (buffer *ChannelBuffer[T]) Push(ctx context.Context, element *T) error { + select { + case buffer.inputBuffer <- element: + // WARN: LACKS CHECKS FOR CLOSED CHANNEL + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func (buffer *ChannelBuffer[T]) Pop(ctx context.Context) (*T, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case data, ok := <-buffer.bufferChannel: + if !ok { + return nil, ErrorChannelBufferClose + } + if data == nil { + return nil, ErrorElementUnallocated + } + return data, nil + } +} + +func (buffer *ChannelBuffer[T]) Generate() *T { + return buffer.pool.Get() +} + +func (buffer *ChannelBuffer[T]) PutBack(element *T) { + if buffer.pool != nil { + buffer.pool.Put(element) + } +} + +func (buffer *ChannelBuffer[T]) GetChannel() chan *T { + return buffer.bufferChannel +} + +func (buffer *ChannelBuffer[T]) Size() int { + return len(buffer.bufferChannel) +} + +func (buffer *ChannelBuffer[T]) loop() { + defer buffer.close() +loop: + for { + select { + case <-buffer.ctx.Done(): + return + case element, ok := <-buffer.inputBuffer: + if !ok || element == nil { + continue loop + } + select { + case buffer.bufferChannel <- element: // SUCCESSFULLY BUFFERED + continue loop + default: + select { + case oldElement := <-buffer.bufferChannel: + buffer.PutBack(oldElement) + select { + case buffer.bufferChannel <- element: + continue loop + default: + fmt.Println("unexpected buffer state. skipping the element..") + buffer.PutBack(element) + } + } + } + } + } +} + +func (buffer *ChannelBuffer[T]) close() { +loop: + for { + select { + case element := <-buffer.bufferChannel: + if buffer.pool != nil { + buffer.pool.Put(element) + } + default: + close(buffer.bufferChannel) + close(buffer.inputBuffer) + break loop + } + } + if buffer.pool != nil { + buffer.pool.Release() + } +} diff --git a/pkg/package.go b/pkg/package.go new file mode 100644 index 0000000..9a2b7a7 --- /dev/null +++ b/pkg/package.go @@ -0,0 +1,24 @@ +package buffer + +import "context" + +type Pool[T any] interface { + Get() *T + Put(*T) + Release() +} + +type Buffer[T any] interface { + Push(context.Context, *T) error + Pop(ctx context.Context) (*T, error) + Size() int +} + +type BufferWithGenerator[T any] interface { + Push(context.Context, *T) error + Pop(ctx context.Context) (*T, error) + Size() int + Generate() *T + PutBack(*T) + GetChannel() chan *T +}