diff --git a/Makefile b/Makefile index 8f5ecba..3a63cce 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ .PHONY: t t: - go test -race -count=1 ./... + go test -race -count=1 . .PHONY: f f: diff --git a/cache.go b/cache.go index 074ad56..cce4f3c 100644 --- a/cache.go +++ b/cache.go @@ -43,6 +43,7 @@ type Cache[T any] struct { bucketMask uint32 deletables chan *Item[T] promotables chan *Item[T] + setables chan *Item[T] } // Create a new cache with the specified configuration @@ -56,6 +57,7 @@ func New[T any](config *Configuration[T]) *Cache[T] { buckets: make([]*bucket[T], config.buckets), deletables: make(chan *Item[T], config.deleteBuffer), promotables: make(chan *Item[T], config.promoteBuffer), + setables: make(chan *Item[T], config.setableBuffer), } for i := 0; i < config.buckets; i++ { c.buckets[i] = &bucket[T]{ @@ -196,7 +198,7 @@ func (c *Cache[T]) set(key string, value T, duration time.Duration, track bool) if existing != nil { c.deletables <- existing } - c.promotables <- item + c.setables <- item return item } @@ -220,6 +222,8 @@ func (c *Cache[T]) worker() { select { case item := <-c.promotables: promoteItem(item) + case item := <-c.setables: + promoteItem(item) case item := <-c.deletables: c.doDelete(item) case control := <-cc: @@ -248,7 +252,7 @@ func (c *Cache[T]) worker() { dropped += c.gc() msg.done <- struct{}{} case controlSyncUpdates: - doAllPendingPromotesAndDeletes(c.promotables, promoteItem, c.deletables, c.doDelete) + doAllPendingPromotesAndDeletes(c.promotables, c.setables, promoteItem, c.deletables, c.doDelete) msg.done <- struct{}{} } } @@ -272,6 +276,7 @@ drain: // that were already sent by the same goroutine. func doAllPendingPromotesAndDeletes[T any]( promotables <-chan *Item[T], + setables <-chan *Item[T], promoteFn func(*Item[T]), deletables <-chan *Item[T], deleteFn func(*Item[T]), @@ -281,6 +286,8 @@ doAllPromotes: select { case item := <-promotables: promoteFn(item) + case item := <-setables: + promoteFn(item) default: break doAllPromotes } diff --git a/cache_test.go b/cache_test.go index 050a1d8..87a77dc 100644 --- a/cache_test.go +++ b/cache_test.go @@ -361,6 +361,20 @@ func Test_ConcurrentStop(t *testing.T) { } } +func Test_UnbufferedSetable_Enforces_MaxSize(t *testing.T) { + cache := New(Configure[string]().MaxSize(3).SetableBuffer(0).ItemsToPrune(1)) + cache.Set("a", "1", time.Minute) + cache.Set("b", "2", time.Minute) + cache.Set("c", "3", time.Minute) + cache.Set("d", "4", time.Minute) + cache.Set("e", "5", time.Minute) + assert.Nil(t, cache.Get("a")) + // "b" could or could not be purged + assert.Equal(t, cache.Get("c").Value(), "3") + assert.Equal(t, cache.Get("d").Value(), "4") + assert.Equal(t, cache.Get("e").Value(), "5") +} + type SizedItem struct { id int s int64 diff --git a/configuration.go b/configuration.go index 3267680..b0a8b4e 100644 --- a/configuration.go +++ b/configuration.go @@ -6,6 +6,7 @@ type Configuration[T any] struct { itemsToPrune int deleteBuffer int promoteBuffer int + setableBuffer int getsPerPromote int32 tracking bool onDelete func(item *Item[T]) @@ -21,6 +22,7 @@ func Configure[T any]() *Configuration[T] { deleteBuffer: 1024, getsPerPromote: 3, promoteBuffer: 1024, + setableBuffer: 256, maxSize: 5000, tracking: false, } @@ -59,6 +61,14 @@ func (c *Configuration[T]) PromoteBuffer(size uint32) *Configuration[T] { return c } +// The size of the queue for items which are set. If the queue fills, sets block. +// Setting this to 0 will ensure that the queue never grows being MaxSize+1 +// [256] +func (c *Configuration[T]) SetableBuffer(size uint32) *Configuration[T] { + c.setableBuffer = int(size) + return c +} + // The size of the queue for items which should be deleted. If the queue fills // up, calls to Delete() will block func (c *Configuration[T]) DeleteBuffer(size uint32) *Configuration[T] { diff --git a/layeredcache.go b/layeredcache.go index ada096d..0d9f873 100644 --- a/layeredcache.go +++ b/layeredcache.go @@ -16,6 +16,7 @@ type LayeredCache[T any] struct { size int64 deletables chan *Item[T] promotables chan *Item[T] + setables chan *Item[T] } // Create a new layered cache with the specified configuration. @@ -40,6 +41,7 @@ func Layered[T any](config *Configuration[T]) *LayeredCache[T] { buckets: make([]*layeredBucket[T], config.buckets), deletables: make(chan *Item[T], config.deleteBuffer), promotables: make(chan *Item[T], config.promoteBuffer), + setables: make(chan *Item[T], config.setableBuffer), } for i := 0; i < int(config.buckets); i++ { c.buckets[i] = &layeredBucket[T]{ @@ -186,7 +188,7 @@ func (c *LayeredCache[T]) set(primary, secondary string, value T, duration time. if existing != nil { c.deletables <- existing } - c.promote(item) + c.setables <- item return item } @@ -196,10 +198,6 @@ func (c *LayeredCache[T]) bucket(key string) *layeredBucket[T] { return c.buckets[h.Sum32()&c.bucketMask] } -func (c *LayeredCache[T]) promote(item *Item[T]) { - c.promotables <- item -} - func (c *LayeredCache[T]) worker() { dropped := 0 cc := c.control @@ -214,6 +212,8 @@ func (c *LayeredCache[T]) worker() { select { case item := <-c.promotables: promoteItem(item) + case item := <-c.setables: + promoteItem(item) case item := <-c.deletables: c.doDelete(item) case control := <-cc: @@ -242,7 +242,7 @@ func (c *LayeredCache[T]) worker() { dropped += c.gc() msg.done <- struct{}{} case controlSyncUpdates: - doAllPendingPromotesAndDeletes(c.promotables, promoteItem, c.deletables, c.doDelete) + doAllPendingPromotesAndDeletes(c.promotables, c.promotables, promoteItem, c.deletables, c.doDelete) msg.done <- struct{}{} } } diff --git a/readme.md b/readme.md index 101927a..781559c 100644 --- a/readme.md +++ b/readme.md @@ -37,9 +37,9 @@ var cache = ccache.New(ccache.Configure[int]().MaxSize(1000).ItemsToPrune(100)) The most likely configuration options to tweak are: -* `MaxSize(int)` - the maximum number size to store in the cache (default: 5000) +* `MaxSize(int)` - the maximum number size to store in the cache (default: 5000) () * `GetsPerPromote(int)` - the number of times an item is fetched before we promote it. For large caches with long TTLs, it normally isn't necessary to promote an item after every fetch (default: 3) -* `ItemsToPrune(int)` - the number of items to prune when we hit `MaxSize`. Freeing up more than 1 slot at a time improved performance (default: 500) +* `ItemsToPrune(int)` - the number of items to prune when we hit `MaxSize`. Freeing up more than 1 slot at a time improveds performance (default: 500) Configurations that change the internals of the cache, which aren't as likely to need tweaking: diff --git a/secondarycache.go b/secondarycache.go index 0f62df3..83ce080 100644 --- a/secondarycache.go +++ b/secondarycache.go @@ -20,7 +20,7 @@ func (s *SecondaryCache[T]) Set(secondary string, value T, duration time.Duratio if existing != nil { s.pCache.deletables <- existing } - s.pCache.promote(item) + s.pCache.promotables <- item return item }