2 Commits

Author SHA1 Message Date
Karl Seguin
aa97455599 add readme section on max size hard/soft limit 2023-03-08 14:09:21 +08:00
Karl Seguin
55899506d5 Add setable buffer to handle sets in order to allow a hard max size limit
Previously, items were pushed onto the frequency linked list via the promotable
buffer. As a general rule, you want your protobable buffer to be quite large,
since you don't want to block Gets. But because Set uses the same buffer, the
cache could grow to MaxSize + cap(promotables).

Sets are now "promoted" via a new "setables" buffer. These are handled exactly
the same way as before, but having it be a separate buffer means they can have
different capacity. Thus, using the new `SetableBuffer(int)` configuration
method can help set a hard limit on the maximum size.
2023-03-08 12:44:20 +08:00
7 changed files with 46 additions and 12 deletions

View File

@@ -1,6 +1,6 @@
.PHONY: t
t:
go test -race -count=1 ./...
go test -race -count=1 .
.PHONY: f
f:

View File

@@ -43,6 +43,7 @@ type Cache[T any] struct {
bucketMask uint32
deletables chan *Item[T]
promotables chan *Item[T]
setables chan *Item[T]
}
// Create a new cache with the specified configuration
@@ -56,6 +57,7 @@ func New[T any](config *Configuration[T]) *Cache[T] {
buckets: make([]*bucket[T], config.buckets),
deletables: make(chan *Item[T], config.deleteBuffer),
promotables: make(chan *Item[T], config.promoteBuffer),
setables: make(chan *Item[T], config.setableBuffer),
}
for i := 0; i < config.buckets; i++ {
c.buckets[i] = &bucket[T]{
@@ -196,7 +198,7 @@ func (c *Cache[T]) set(key string, value T, duration time.Duration, track bool)
if existing != nil {
c.deletables <- existing
}
c.promotables <- item
c.setables <- item
return item
}
@@ -220,6 +222,8 @@ func (c *Cache[T]) worker() {
select {
case item := <-c.promotables:
promoteItem(item)
case item := <-c.setables:
promoteItem(item)
case item := <-c.deletables:
c.doDelete(item)
case control := <-cc:
@@ -248,7 +252,7 @@ func (c *Cache[T]) worker() {
dropped += c.gc()
msg.done <- struct{}{}
case controlSyncUpdates:
doAllPendingPromotesAndDeletes(c.promotables, promoteItem, c.deletables, c.doDelete)
doAllPendingPromotesAndDeletes(c.promotables, c.setables, promoteItem, c.deletables, c.doDelete)
msg.done <- struct{}{}
}
}
@@ -272,6 +276,7 @@ drain:
// that were already sent by the same goroutine.
func doAllPendingPromotesAndDeletes[T any](
promotables <-chan *Item[T],
setables <-chan *Item[T],
promoteFn func(*Item[T]),
deletables <-chan *Item[T],
deleteFn func(*Item[T]),
@@ -281,6 +286,8 @@ doAllPromotes:
select {
case item := <-promotables:
promoteFn(item)
case item := <-setables:
promoteFn(item)
default:
break doAllPromotes
}

View File

@@ -361,6 +361,20 @@ func Test_ConcurrentStop(t *testing.T) {
}
}
func Test_UnbufferedSetable_Enforces_MaxSize(t *testing.T) {
cache := New(Configure[string]().MaxSize(3).SetableBuffer(0).ItemsToPrune(1))
cache.Set("a", "1", time.Minute)
cache.Set("b", "2", time.Minute)
cache.Set("c", "3", time.Minute)
cache.Set("d", "4", time.Minute)
cache.Set("e", "5", time.Minute)
assert.Nil(t, cache.Get("a"))
// "b" could or could not be purged
assert.Equal(t, cache.Get("c").Value(), "3")
assert.Equal(t, cache.Get("d").Value(), "4")
assert.Equal(t, cache.Get("e").Value(), "5")
}
type SizedItem struct {
id int
s int64

View File

@@ -6,6 +6,7 @@ type Configuration[T any] struct {
itemsToPrune int
deleteBuffer int
promoteBuffer int
setableBuffer int
getsPerPromote int32
tracking bool
onDelete func(item *Item[T])
@@ -21,6 +22,7 @@ func Configure[T any]() *Configuration[T] {
deleteBuffer: 1024,
getsPerPromote: 3,
promoteBuffer: 1024,
setableBuffer: 256,
maxSize: 5000,
tracking: false,
}
@@ -59,6 +61,14 @@ func (c *Configuration[T]) PromoteBuffer(size uint32) *Configuration[T] {
return c
}
// The size of the queue for items which are set. If the queue fills, sets block.
// Setting this to 0 will ensure that the queue never grows being MaxSize+1
// [256]
func (c *Configuration[T]) SetableBuffer(size uint32) *Configuration[T] {
c.setableBuffer = int(size)
return c
}
// The size of the queue for items which should be deleted. If the queue fills
// up, calls to Delete() will block
func (c *Configuration[T]) DeleteBuffer(size uint32) *Configuration[T] {

View File

@@ -16,6 +16,7 @@ type LayeredCache[T any] struct {
size int64
deletables chan *Item[T]
promotables chan *Item[T]
setables chan *Item[T]
}
// Create a new layered cache with the specified configuration.
@@ -40,6 +41,7 @@ func Layered[T any](config *Configuration[T]) *LayeredCache[T] {
buckets: make([]*layeredBucket[T], config.buckets),
deletables: make(chan *Item[T], config.deleteBuffer),
promotables: make(chan *Item[T], config.promoteBuffer),
setables: make(chan *Item[T], config.setableBuffer),
}
for i := 0; i < int(config.buckets); i++ {
c.buckets[i] = &layeredBucket[T]{
@@ -186,7 +188,7 @@ func (c *LayeredCache[T]) set(primary, secondary string, value T, duration time.
if existing != nil {
c.deletables <- existing
}
c.promote(item)
c.setables <- item
return item
}
@@ -196,10 +198,6 @@ func (c *LayeredCache[T]) bucket(key string) *layeredBucket[T] {
return c.buckets[h.Sum32()&c.bucketMask]
}
func (c *LayeredCache[T]) promote(item *Item[T]) {
c.promotables <- item
}
func (c *LayeredCache[T]) worker() {
dropped := 0
cc := c.control
@@ -214,6 +212,8 @@ func (c *LayeredCache[T]) worker() {
select {
case item := <-c.promotables:
promoteItem(item)
case item := <-c.setables:
promoteItem(item)
case item := <-c.deletables:
c.doDelete(item)
case control := <-cc:
@@ -242,7 +242,7 @@ func (c *LayeredCache[T]) worker() {
dropped += c.gc()
msg.done <- struct{}{}
case controlSyncUpdates:
doAllPendingPromotesAndDeletes(c.promotables, promoteItem, c.deletables, c.doDelete)
doAllPendingPromotesAndDeletes(c.promotables, c.promotables, promoteItem, c.deletables, c.doDelete)
msg.done <- struct{}{}
}
}

View File

@@ -37,9 +37,9 @@ var cache = ccache.New(ccache.Configure[int]().MaxSize(1000).ItemsToPrune(100))
The most likely configuration options to tweak are:
* `MaxSize(int)` - the maximum number size to store in the cache (default: 5000)
* `MaxSize(int)` - the maximum size to store in the cache (default: 5000)
* `GetsPerPromote(int)` - the number of times an item is fetched before we promote it. For large caches with long TTLs, it normally isn't necessary to promote an item after every fetch (default: 3)
* `ItemsToPrune(int)` - the number of items to prune when we hit `MaxSize`. Freeing up more than 1 slot at a time improved performance (default: 500)
* `ItemsToPrune(int)` - the number of items to prune when we hit `MaxSize`. Freeing up more than 1 slot at a time improves performance (default: 500)
Configurations that change the internals of the cache, which aren't as likely to need tweaking:
@@ -47,6 +47,9 @@ Configurations that change the internals of the cache, which aren't as likely to
* `PromoteBuffer(int)` - the size of the buffer to use to queue promotions (default: 1024)
* `DeleteBuffer(int)` the size of the buffer to use to queue deletions (default: 1024)
## MaxSize Soft vs Hard Limit
By default, MaxSize is a soft limit. This is a result of the fact that cache pruning happens asychronously in a separate "worker" goroutine. The hard limit is MaxSize + SetableBufferSize + 1. The SettableBufferSize can be configured via the `SetableBuffer(INT)` configuration function. It defaults to 256.
## Usage
Once the cache is setup, you can `Get`, `Set` and `Delete` items from it. A `Get` returns an `*Item`:

View File

@@ -20,7 +20,7 @@ func (s *SecondaryCache[T]) Set(secondary string, value T, duration time.Duratio
if existing != nil {
s.pCache.deletables <- existing
}
s.pCache.promote(item)
s.pCache.promotables <- item
return item
}