first commit

This commit is contained in:
harshabose
2025-02-24 17:40:44 +05:30
commit 34413c88ec
5 changed files with 154 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
/third-party
.idea

3
go.mod Normal file
View File

@@ -0,0 +1,3 @@
module github.com/harshabose/tools/buffer
go 1.23.3

8
pkg/errors.go Normal file
View File

@@ -0,0 +1,8 @@
package buffer
import "errors"
var (
ErrorElementUnallocated = errors.New("encountered nil in the buffer. this should not happen. check usage")
ErrorChannelBufferClose = errors.New("channel buffer has be closed. cannot perform this operation")
)

117
pkg/limit_buffer.go Normal file
View File

@@ -0,0 +1,117 @@
package buffer
import (
"context"
"fmt"
)
type ChannelBuffer[T any] struct {
pool Pool[T]
bufferChannel chan *T
inputBuffer chan *T
ctx context.Context
}
func CreateChannelBuffer[T any](ctx context.Context, size int, pool Pool[T]) *ChannelBuffer[T] {
buffer := &ChannelBuffer[T]{
pool: pool,
bufferChannel: make(chan *T, size),
inputBuffer: make(chan *T),
ctx: ctx,
}
go buffer.loop()
return buffer
}
func (buffer *ChannelBuffer[T]) Push(ctx context.Context, element *T) error {
select {
case buffer.inputBuffer <- element:
// WARN: LACKS CHECKS FOR CLOSED CHANNEL
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (buffer *ChannelBuffer[T]) Pop(ctx context.Context) (*T, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case data, ok := <-buffer.bufferChannel:
if !ok {
return nil, ErrorChannelBufferClose
}
if data == nil {
return nil, ErrorElementUnallocated
}
return data, nil
}
}
func (buffer *ChannelBuffer[T]) Generate() *T {
return buffer.pool.Get()
}
func (buffer *ChannelBuffer[T]) PutBack(element *T) {
if buffer.pool != nil {
buffer.pool.Put(element)
}
}
func (buffer *ChannelBuffer[T]) GetChannel() chan *T {
return buffer.bufferChannel
}
func (buffer *ChannelBuffer[T]) Size() int {
return len(buffer.bufferChannel)
}
func (buffer *ChannelBuffer[T]) loop() {
defer buffer.close()
loop:
for {
select {
case <-buffer.ctx.Done():
return
case element, ok := <-buffer.inputBuffer:
if !ok || element == nil {
continue loop
}
select {
case buffer.bufferChannel <- element: // SUCCESSFULLY BUFFERED
continue loop
default:
select {
case oldElement := <-buffer.bufferChannel:
buffer.PutBack(oldElement)
select {
case buffer.bufferChannel <- element:
continue loop
default:
fmt.Println("unexpected buffer state. skipping the element..")
buffer.PutBack(element)
}
}
}
}
}
}
func (buffer *ChannelBuffer[T]) close() {
loop:
for {
select {
case element := <-buffer.bufferChannel:
if buffer.pool != nil {
buffer.pool.Put(element)
}
default:
close(buffer.bufferChannel)
close(buffer.inputBuffer)
break loop
}
}
if buffer.pool != nil {
buffer.pool.Release()
}
}

24
pkg/package.go Normal file
View File

@@ -0,0 +1,24 @@
package buffer
import "context"
type Pool[T any] interface {
Get() *T
Put(*T)
Release()
}
type Buffer[T any] interface {
Push(context.Context, *T) error
Pop(ctx context.Context) (*T, error)
Size() int
}
type BufferWithGenerator[T any] interface {
Push(context.Context, *T) error
Pop(ctx context.Context) (*T, error)
Size() int
Generate() *T
PutBack(*T)
GetChannel() chan *T
}