Add setable buffer to handle sets in order to allow a hard max size limit
Previously, items were pushed onto the frequency linked list via the promotable buffer. As a general rule, you want your protobable buffer to be quite large, since you don't want to block Gets. But because Set uses the same buffer, the cache could grow to MaxSize + cap(promotables). Sets are now "promoted" via a new "setables" buffer. These are handled exactly the same way as before, but having it be a separate buffer means they can have different capacity. Thus, using the new `SetableBuffer(int)` configuration method can help set a hard limit on the maximum size.
This commit is contained in:
2
Makefile
2
Makefile
@@ -1,6 +1,6 @@
|
|||||||
.PHONY: t
|
.PHONY: t
|
||||||
t:
|
t:
|
||||||
go test -race -count=1 ./...
|
go test -race -count=1 .
|
||||||
|
|
||||||
.PHONY: f
|
.PHONY: f
|
||||||
f:
|
f:
|
||||||
|
11
cache.go
11
cache.go
@@ -43,6 +43,7 @@ type Cache[T any] struct {
|
|||||||
bucketMask uint32
|
bucketMask uint32
|
||||||
deletables chan *Item[T]
|
deletables chan *Item[T]
|
||||||
promotables chan *Item[T]
|
promotables chan *Item[T]
|
||||||
|
setables chan *Item[T]
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a new cache with the specified configuration
|
// Create a new cache with the specified configuration
|
||||||
@@ -56,6 +57,7 @@ func New[T any](config *Configuration[T]) *Cache[T] {
|
|||||||
buckets: make([]*bucket[T], config.buckets),
|
buckets: make([]*bucket[T], config.buckets),
|
||||||
deletables: make(chan *Item[T], config.deleteBuffer),
|
deletables: make(chan *Item[T], config.deleteBuffer),
|
||||||
promotables: make(chan *Item[T], config.promoteBuffer),
|
promotables: make(chan *Item[T], config.promoteBuffer),
|
||||||
|
setables: make(chan *Item[T], config.setableBuffer),
|
||||||
}
|
}
|
||||||
for i := 0; i < config.buckets; i++ {
|
for i := 0; i < config.buckets; i++ {
|
||||||
c.buckets[i] = &bucket[T]{
|
c.buckets[i] = &bucket[T]{
|
||||||
@@ -196,7 +198,7 @@ func (c *Cache[T]) set(key string, value T, duration time.Duration, track bool)
|
|||||||
if existing != nil {
|
if existing != nil {
|
||||||
c.deletables <- existing
|
c.deletables <- existing
|
||||||
}
|
}
|
||||||
c.promotables <- item
|
c.setables <- item
|
||||||
return item
|
return item
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -220,6 +222,8 @@ func (c *Cache[T]) worker() {
|
|||||||
select {
|
select {
|
||||||
case item := <-c.promotables:
|
case item := <-c.promotables:
|
||||||
promoteItem(item)
|
promoteItem(item)
|
||||||
|
case item := <-c.setables:
|
||||||
|
promoteItem(item)
|
||||||
case item := <-c.deletables:
|
case item := <-c.deletables:
|
||||||
c.doDelete(item)
|
c.doDelete(item)
|
||||||
case control := <-cc:
|
case control := <-cc:
|
||||||
@@ -248,7 +252,7 @@ func (c *Cache[T]) worker() {
|
|||||||
dropped += c.gc()
|
dropped += c.gc()
|
||||||
msg.done <- struct{}{}
|
msg.done <- struct{}{}
|
||||||
case controlSyncUpdates:
|
case controlSyncUpdates:
|
||||||
doAllPendingPromotesAndDeletes(c.promotables, promoteItem, c.deletables, c.doDelete)
|
doAllPendingPromotesAndDeletes(c.promotables, c.setables, promoteItem, c.deletables, c.doDelete)
|
||||||
msg.done <- struct{}{}
|
msg.done <- struct{}{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -272,6 +276,7 @@ drain:
|
|||||||
// that were already sent by the same goroutine.
|
// that were already sent by the same goroutine.
|
||||||
func doAllPendingPromotesAndDeletes[T any](
|
func doAllPendingPromotesAndDeletes[T any](
|
||||||
promotables <-chan *Item[T],
|
promotables <-chan *Item[T],
|
||||||
|
setables <-chan *Item[T],
|
||||||
promoteFn func(*Item[T]),
|
promoteFn func(*Item[T]),
|
||||||
deletables <-chan *Item[T],
|
deletables <-chan *Item[T],
|
||||||
deleteFn func(*Item[T]),
|
deleteFn func(*Item[T]),
|
||||||
@@ -281,6 +286,8 @@ doAllPromotes:
|
|||||||
select {
|
select {
|
||||||
case item := <-promotables:
|
case item := <-promotables:
|
||||||
promoteFn(item)
|
promoteFn(item)
|
||||||
|
case item := <-setables:
|
||||||
|
promoteFn(item)
|
||||||
default:
|
default:
|
||||||
break doAllPromotes
|
break doAllPromotes
|
||||||
}
|
}
|
||||||
|
@@ -361,6 +361,20 @@ func Test_ConcurrentStop(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Test_UnbufferedSetable_Enforces_MaxSize(t *testing.T) {
|
||||||
|
cache := New(Configure[string]().MaxSize(3).SetableBuffer(0).ItemsToPrune(1))
|
||||||
|
cache.Set("a", "1", time.Minute)
|
||||||
|
cache.Set("b", "2", time.Minute)
|
||||||
|
cache.Set("c", "3", time.Minute)
|
||||||
|
cache.Set("d", "4", time.Minute)
|
||||||
|
cache.Set("e", "5", time.Minute)
|
||||||
|
assert.Nil(t, cache.Get("a"))
|
||||||
|
// "b" could or could not be purged
|
||||||
|
assert.Equal(t, cache.Get("c").Value(), "3")
|
||||||
|
assert.Equal(t, cache.Get("d").Value(), "4")
|
||||||
|
assert.Equal(t, cache.Get("e").Value(), "5")
|
||||||
|
}
|
||||||
|
|
||||||
type SizedItem struct {
|
type SizedItem struct {
|
||||||
id int
|
id int
|
||||||
s int64
|
s int64
|
||||||
|
@@ -6,6 +6,7 @@ type Configuration[T any] struct {
|
|||||||
itemsToPrune int
|
itemsToPrune int
|
||||||
deleteBuffer int
|
deleteBuffer int
|
||||||
promoteBuffer int
|
promoteBuffer int
|
||||||
|
setableBuffer int
|
||||||
getsPerPromote int32
|
getsPerPromote int32
|
||||||
tracking bool
|
tracking bool
|
||||||
onDelete func(item *Item[T])
|
onDelete func(item *Item[T])
|
||||||
@@ -21,6 +22,7 @@ func Configure[T any]() *Configuration[T] {
|
|||||||
deleteBuffer: 1024,
|
deleteBuffer: 1024,
|
||||||
getsPerPromote: 3,
|
getsPerPromote: 3,
|
||||||
promoteBuffer: 1024,
|
promoteBuffer: 1024,
|
||||||
|
setableBuffer: 256,
|
||||||
maxSize: 5000,
|
maxSize: 5000,
|
||||||
tracking: false,
|
tracking: false,
|
||||||
}
|
}
|
||||||
@@ -59,6 +61,14 @@ func (c *Configuration[T]) PromoteBuffer(size uint32) *Configuration[T] {
|
|||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The size of the queue for items which are set. If the queue fills, sets block.
|
||||||
|
// Setting this to 0 will ensure that the queue never grows being MaxSize+1
|
||||||
|
// [256]
|
||||||
|
func (c *Configuration[T]) SetableBuffer(size uint32) *Configuration[T] {
|
||||||
|
c.setableBuffer = int(size)
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
// The size of the queue for items which should be deleted. If the queue fills
|
// The size of the queue for items which should be deleted. If the queue fills
|
||||||
// up, calls to Delete() will block
|
// up, calls to Delete() will block
|
||||||
func (c *Configuration[T]) DeleteBuffer(size uint32) *Configuration[T] {
|
func (c *Configuration[T]) DeleteBuffer(size uint32) *Configuration[T] {
|
||||||
|
@@ -16,6 +16,7 @@ type LayeredCache[T any] struct {
|
|||||||
size int64
|
size int64
|
||||||
deletables chan *Item[T]
|
deletables chan *Item[T]
|
||||||
promotables chan *Item[T]
|
promotables chan *Item[T]
|
||||||
|
setables chan *Item[T]
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a new layered cache with the specified configuration.
|
// Create a new layered cache with the specified configuration.
|
||||||
@@ -40,6 +41,7 @@ func Layered[T any](config *Configuration[T]) *LayeredCache[T] {
|
|||||||
buckets: make([]*layeredBucket[T], config.buckets),
|
buckets: make([]*layeredBucket[T], config.buckets),
|
||||||
deletables: make(chan *Item[T], config.deleteBuffer),
|
deletables: make(chan *Item[T], config.deleteBuffer),
|
||||||
promotables: make(chan *Item[T], config.promoteBuffer),
|
promotables: make(chan *Item[T], config.promoteBuffer),
|
||||||
|
setables: make(chan *Item[T], config.setableBuffer),
|
||||||
}
|
}
|
||||||
for i := 0; i < int(config.buckets); i++ {
|
for i := 0; i < int(config.buckets); i++ {
|
||||||
c.buckets[i] = &layeredBucket[T]{
|
c.buckets[i] = &layeredBucket[T]{
|
||||||
@@ -186,7 +188,7 @@ func (c *LayeredCache[T]) set(primary, secondary string, value T, duration time.
|
|||||||
if existing != nil {
|
if existing != nil {
|
||||||
c.deletables <- existing
|
c.deletables <- existing
|
||||||
}
|
}
|
||||||
c.promote(item)
|
c.setables <- item
|
||||||
return item
|
return item
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -196,10 +198,6 @@ func (c *LayeredCache[T]) bucket(key string) *layeredBucket[T] {
|
|||||||
return c.buckets[h.Sum32()&c.bucketMask]
|
return c.buckets[h.Sum32()&c.bucketMask]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *LayeredCache[T]) promote(item *Item[T]) {
|
|
||||||
c.promotables <- item
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *LayeredCache[T]) worker() {
|
func (c *LayeredCache[T]) worker() {
|
||||||
dropped := 0
|
dropped := 0
|
||||||
cc := c.control
|
cc := c.control
|
||||||
@@ -214,6 +212,8 @@ func (c *LayeredCache[T]) worker() {
|
|||||||
select {
|
select {
|
||||||
case item := <-c.promotables:
|
case item := <-c.promotables:
|
||||||
promoteItem(item)
|
promoteItem(item)
|
||||||
|
case item := <-c.setables:
|
||||||
|
promoteItem(item)
|
||||||
case item := <-c.deletables:
|
case item := <-c.deletables:
|
||||||
c.doDelete(item)
|
c.doDelete(item)
|
||||||
case control := <-cc:
|
case control := <-cc:
|
||||||
@@ -242,7 +242,7 @@ func (c *LayeredCache[T]) worker() {
|
|||||||
dropped += c.gc()
|
dropped += c.gc()
|
||||||
msg.done <- struct{}{}
|
msg.done <- struct{}{}
|
||||||
case controlSyncUpdates:
|
case controlSyncUpdates:
|
||||||
doAllPendingPromotesAndDeletes(c.promotables, promoteItem, c.deletables, c.doDelete)
|
doAllPendingPromotesAndDeletes(c.promotables, c.promotables, promoteItem, c.deletables, c.doDelete)
|
||||||
msg.done <- struct{}{}
|
msg.done <- struct{}{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -37,9 +37,9 @@ var cache = ccache.New(ccache.Configure[int]().MaxSize(1000).ItemsToPrune(100))
|
|||||||
|
|
||||||
The most likely configuration options to tweak are:
|
The most likely configuration options to tweak are:
|
||||||
|
|
||||||
* `MaxSize(int)` - the maximum number size to store in the cache (default: 5000)
|
* `MaxSize(int)` - the maximum number size to store in the cache (default: 5000) ()
|
||||||
* `GetsPerPromote(int)` - the number of times an item is fetched before we promote it. For large caches with long TTLs, it normally isn't necessary to promote an item after every fetch (default: 3)
|
* `GetsPerPromote(int)` - the number of times an item is fetched before we promote it. For large caches with long TTLs, it normally isn't necessary to promote an item after every fetch (default: 3)
|
||||||
* `ItemsToPrune(int)` - the number of items to prune when we hit `MaxSize`. Freeing up more than 1 slot at a time improved performance (default: 500)
|
* `ItemsToPrune(int)` - the number of items to prune when we hit `MaxSize`. Freeing up more than 1 slot at a time improveds performance (default: 500)
|
||||||
|
|
||||||
Configurations that change the internals of the cache, which aren't as likely to need tweaking:
|
Configurations that change the internals of the cache, which aren't as likely to need tweaking:
|
||||||
|
|
||||||
|
@@ -20,7 +20,7 @@ func (s *SecondaryCache[T]) Set(secondary string, value T, duration time.Duratio
|
|||||||
if existing != nil {
|
if existing != nil {
|
||||||
s.pCache.deletables <- existing
|
s.pCache.deletables <- existing
|
||||||
}
|
}
|
||||||
s.pCache.promote(item)
|
s.pCache.promotables <- item
|
||||||
return item
|
return item
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user