Compare commits
4 Commits
v3.0.2
...
setable_bu
Author | SHA1 | Date | |
---|---|---|---|
![]() |
aa97455599 | ||
![]() |
55899506d5 | ||
![]() |
35052434f3 | ||
![]() |
22776be1ee |
2
Makefile
2
Makefile
@@ -1,6 +1,6 @@
|
||||
.PHONY: t
|
||||
t:
|
||||
go test -race -count=1 ./...
|
||||
go test -race -count=1 .
|
||||
|
||||
.PHONY: f
|
||||
f:
|
||||
|
138
cache.go
138
cache.go
@@ -36,13 +36,14 @@ type gc struct {
|
||||
|
||||
type Cache[T any] struct {
|
||||
*Configuration[T]
|
||||
control
|
||||
list *List[*Item[T]]
|
||||
size int64
|
||||
buckets []*bucket[T]
|
||||
bucketMask uint32
|
||||
deletables chan *Item[T]
|
||||
promotables chan *Item[T]
|
||||
control chan interface{}
|
||||
setables chan *Item[T]
|
||||
}
|
||||
|
||||
// Create a new cache with the specified configuration
|
||||
@@ -51,16 +52,19 @@ func New[T any](config *Configuration[T]) *Cache[T] {
|
||||
c := &Cache[T]{
|
||||
list: NewList[*Item[T]](),
|
||||
Configuration: config,
|
||||
control: newControl(),
|
||||
bucketMask: uint32(config.buckets) - 1,
|
||||
buckets: make([]*bucket[T], config.buckets),
|
||||
control: make(chan interface{}),
|
||||
deletables: make(chan *Item[T], config.deleteBuffer),
|
||||
promotables: make(chan *Item[T], config.promoteBuffer),
|
||||
setables: make(chan *Item[T], config.setableBuffer),
|
||||
}
|
||||
for i := 0; i < config.buckets; i++ {
|
||||
c.buckets[i] = &bucket[T]{
|
||||
lookup: make(map[string]*Item[T]),
|
||||
}
|
||||
}
|
||||
c.restart()
|
||||
go c.worker()
|
||||
return c
|
||||
}
|
||||
|
||||
@@ -184,94 +188,6 @@ func (c *Cache[T]) Delete(key string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// Clears the cache
|
||||
// This is a control command.
|
||||
func (c *Cache[T]) Clear() {
|
||||
done := make(chan struct{})
|
||||
c.control <- clear{done: done}
|
||||
<-done
|
||||
}
|
||||
|
||||
// Stops the background worker. Operations performed on the cache after Stop
|
||||
// is called are likely to panic
|
||||
// This is a control command.
|
||||
func (c *Cache[T]) Stop() {
|
||||
close(c.promotables)
|
||||
<-c.control
|
||||
}
|
||||
|
||||
// Gets the number of items removed from the cache due to memory pressure since
|
||||
// the last time GetDropped was called
|
||||
// This is a control command.
|
||||
func (c *Cache[T]) GetDropped() int {
|
||||
return doGetDropped(c.control)
|
||||
}
|
||||
|
||||
func doGetDropped(controlCh chan<- interface{}) int {
|
||||
res := make(chan int)
|
||||
controlCh <- getDropped{res: res}
|
||||
return <-res
|
||||
}
|
||||
|
||||
// SyncUpdates waits until the cache has finished asynchronous state updates for any operations
|
||||
// that were done by the current goroutine up to now.
|
||||
//
|
||||
// For efficiency, the cache's implementation of LRU behavior is partly managed by a worker
|
||||
// goroutine that updates its internal data structures asynchronously. This means that the
|
||||
// cache's state in terms of (for instance) eviction of LRU items is only eventually consistent;
|
||||
// there is no guarantee that it happens before a Get or Set call has returned. Most of the time
|
||||
// application code will not care about this, but especially in a test scenario you may want to
|
||||
// be able to know when the worker has caught up.
|
||||
//
|
||||
// This applies only to cache methods that were previously called by the same goroutine that is
|
||||
// now calling SyncUpdates. If other goroutines are using the cache at the same time, there is
|
||||
// no way to know whether any of them still have pending state updates when SyncUpdates returns.
|
||||
// This is a control command.
|
||||
func (c *Cache[T]) SyncUpdates() {
|
||||
doSyncUpdates(c.control)
|
||||
}
|
||||
|
||||
func doSyncUpdates(controlCh chan<- interface{}) {
|
||||
done := make(chan struct{})
|
||||
controlCh <- syncWorker{done: done}
|
||||
<-done
|
||||
}
|
||||
|
||||
// Sets a new max size. That can result in a GC being run if the new maxium size
|
||||
// is smaller than the cached size
|
||||
// This is a control command.
|
||||
func (c *Cache[T]) SetMaxSize(size int64) {
|
||||
done := make(chan struct{})
|
||||
c.control <- setMaxSize{size: size, done: done}
|
||||
<-done
|
||||
}
|
||||
|
||||
// Forces GC. There should be no reason to call this function, except from tests
|
||||
// which require synchronous GC.
|
||||
// This is a control command.
|
||||
func (c *Cache[T]) GC() {
|
||||
done := make(chan struct{})
|
||||
c.control <- gc{done: done}
|
||||
<-done
|
||||
}
|
||||
|
||||
// Gets the size of the cache. This is an O(1) call to make, but it is handled
|
||||
// by the worker goroutine. It's meant to be called periodically for metrics, or
|
||||
// from tests.
|
||||
// This is a control command.
|
||||
func (c *Cache[T]) GetSize() int64 {
|
||||
res := make(chan int64)
|
||||
c.control <- getSize{res}
|
||||
return <-res
|
||||
}
|
||||
|
||||
func (c *Cache[T]) restart() {
|
||||
c.deletables = make(chan *Item[T], c.deleteBuffer)
|
||||
c.promotables = make(chan *Item[T], c.promoteBuffer)
|
||||
c.control = make(chan interface{})
|
||||
go c.worker()
|
||||
}
|
||||
|
||||
func (c *Cache[T]) deleteItem(bucket *bucket[T], item *Item[T]) {
|
||||
bucket.delete(item.key) //stop other GETs from getting it
|
||||
c.deletables <- item
|
||||
@@ -282,7 +198,7 @@ func (c *Cache[T]) set(key string, value T, duration time.Duration, track bool)
|
||||
if existing != nil {
|
||||
c.deletables <- existing
|
||||
}
|
||||
c.promotables <- item
|
||||
c.setables <- item
|
||||
return item
|
||||
}
|
||||
|
||||
@@ -293,48 +209,50 @@ func (c *Cache[T]) bucket(key string) *bucket[T] {
|
||||
}
|
||||
|
||||
func (c *Cache[T]) worker() {
|
||||
defer close(c.control)
|
||||
dropped := 0
|
||||
cc := c.control
|
||||
|
||||
promoteItem := func(item *Item[T]) {
|
||||
if c.doPromote(item) && c.size > c.maxSize {
|
||||
dropped += c.gc()
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case item, ok := <-c.promotables:
|
||||
if ok == false {
|
||||
goto drain
|
||||
}
|
||||
case item := <-c.promotables:
|
||||
promoteItem(item)
|
||||
case item := <-c.setables:
|
||||
promoteItem(item)
|
||||
case item := <-c.deletables:
|
||||
c.doDelete(item)
|
||||
case control := <-c.control:
|
||||
case control := <-cc:
|
||||
switch msg := control.(type) {
|
||||
case getDropped:
|
||||
case controlStop:
|
||||
goto drain
|
||||
case controlGetDropped:
|
||||
msg.res <- dropped
|
||||
dropped = 0
|
||||
case setMaxSize:
|
||||
case controlSetMaxSize:
|
||||
c.maxSize = msg.size
|
||||
if c.size > c.maxSize {
|
||||
dropped += c.gc()
|
||||
}
|
||||
msg.done <- struct{}{}
|
||||
case clear:
|
||||
case controlClear:
|
||||
for _, bucket := range c.buckets {
|
||||
bucket.clear()
|
||||
}
|
||||
c.size = 0
|
||||
c.list = NewList[*Item[T]]()
|
||||
msg.done <- struct{}{}
|
||||
case getSize:
|
||||
case controlGetSize:
|
||||
msg.res <- c.size
|
||||
case gc:
|
||||
case controlGC:
|
||||
dropped += c.gc()
|
||||
msg.done <- struct{}{}
|
||||
case syncWorker:
|
||||
doAllPendingPromotesAndDeletes(c.promotables, promoteItem,
|
||||
c.deletables, c.doDelete)
|
||||
case controlSyncUpdates:
|
||||
doAllPendingPromotesAndDeletes(c.promotables, c.setables, promoteItem, c.deletables, c.doDelete)
|
||||
msg.done <- struct{}{}
|
||||
}
|
||||
}
|
||||
@@ -346,7 +264,6 @@ drain:
|
||||
case item := <-c.deletables:
|
||||
c.doDelete(item)
|
||||
default:
|
||||
close(c.deletables)
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -359,6 +276,7 @@ drain:
|
||||
// that were already sent by the same goroutine.
|
||||
func doAllPendingPromotesAndDeletes[T any](
|
||||
promotables <-chan *Item[T],
|
||||
setables <-chan *Item[T],
|
||||
promoteFn func(*Item[T]),
|
||||
deletables <-chan *Item[T],
|
||||
deleteFn func(*Item[T]),
|
||||
@@ -367,9 +285,9 @@ doAllPromotes:
|
||||
for {
|
||||
select {
|
||||
case item := <-promotables:
|
||||
if item != nil {
|
||||
promoteFn(item)
|
||||
}
|
||||
promoteFn(item)
|
||||
case item := <-setables:
|
||||
promoteFn(item)
|
||||
default:
|
||||
break doAllPromotes
|
||||
}
|
||||
|
@@ -337,6 +337,44 @@ func Test_CachePrune(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func Test_ConcurrentStop(t *testing.T) {
|
||||
for i := 0; i < 100; i++ {
|
||||
cache := New(Configure[string]())
|
||||
r := func() {
|
||||
for {
|
||||
key := strconv.Itoa(int(rand.Int31n(100)))
|
||||
switch rand.Int31n(3) {
|
||||
case 0:
|
||||
cache.Get(key)
|
||||
case 1:
|
||||
cache.Set(key, key, time.Minute)
|
||||
case 2:
|
||||
cache.Delete(key)
|
||||
}
|
||||
}
|
||||
}
|
||||
go r()
|
||||
go r()
|
||||
go r()
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
cache.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
func Test_UnbufferedSetable_Enforces_MaxSize(t *testing.T) {
|
||||
cache := New(Configure[string]().MaxSize(3).SetableBuffer(0).ItemsToPrune(1))
|
||||
cache.Set("a", "1", time.Minute)
|
||||
cache.Set("b", "2", time.Minute)
|
||||
cache.Set("c", "3", time.Minute)
|
||||
cache.Set("d", "4", time.Minute)
|
||||
cache.Set("e", "5", time.Minute)
|
||||
assert.Nil(t, cache.Get("a"))
|
||||
// "b" could or could not be purged
|
||||
assert.Equal(t, cache.Get("c").Value(), "3")
|
||||
assert.Equal(t, cache.Get("d").Value(), "4")
|
||||
assert.Equal(t, cache.Get("e").Value(), "5")
|
||||
}
|
||||
|
||||
type SizedItem struct {
|
||||
id int
|
||||
s int64
|
||||
|
@@ -6,6 +6,7 @@ type Configuration[T any] struct {
|
||||
itemsToPrune int
|
||||
deleteBuffer int
|
||||
promoteBuffer int
|
||||
setableBuffer int
|
||||
getsPerPromote int32
|
||||
tracking bool
|
||||
onDelete func(item *Item[T])
|
||||
@@ -21,6 +22,7 @@ func Configure[T any]() *Configuration[T] {
|
||||
deleteBuffer: 1024,
|
||||
getsPerPromote: 3,
|
||||
promoteBuffer: 1024,
|
||||
setableBuffer: 256,
|
||||
maxSize: 5000,
|
||||
tracking: false,
|
||||
}
|
||||
@@ -59,6 +61,14 @@ func (c *Configuration[T]) PromoteBuffer(size uint32) *Configuration[T] {
|
||||
return c
|
||||
}
|
||||
|
||||
// The size of the queue for items which are set. If the queue fills, sets block.
|
||||
// Setting this to 0 will ensure that the queue never grows being MaxSize+1
|
||||
// [256]
|
||||
func (c *Configuration[T]) SetableBuffer(size uint32) *Configuration[T] {
|
||||
c.setableBuffer = int(size)
|
||||
return c
|
||||
}
|
||||
|
||||
// The size of the queue for items which should be deleted. If the queue fills
|
||||
// up, calls to Delete() will block
|
||||
func (c *Configuration[T]) DeleteBuffer(size uint32) *Configuration[T] {
|
||||
|
110
control.go
Normal file
110
control.go
Normal file
@@ -0,0 +1,110 @@
|
||||
package ccache
|
||||
|
||||
type controlGC struct {
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
type controlClear struct {
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
type controlStop struct {
|
||||
}
|
||||
|
||||
type controlGetSize struct {
|
||||
res chan int64
|
||||
}
|
||||
|
||||
type controlGetDropped struct {
|
||||
res chan int
|
||||
}
|
||||
|
||||
type controlSetMaxSize struct {
|
||||
size int64
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
type controlSyncUpdates struct {
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
type control chan interface{}
|
||||
|
||||
func newControl() chan interface{} {
|
||||
return make(chan interface{}, 5)
|
||||
}
|
||||
|
||||
// Forces GC. There should be no reason to call this function, except from tests
|
||||
// which require synchronous GC.
|
||||
// This is a control command.
|
||||
func (c control) GC() {
|
||||
done := make(chan struct{})
|
||||
c <- controlGC{done: done}
|
||||
<-done
|
||||
}
|
||||
|
||||
// Sends a stop signal to the worker thread. The worker thread will shut down
|
||||
// 5 seconds after the last message is received. The cache should not be used
|
||||
// after Stop is called, but concurrently executing requests should properly finish
|
||||
// executing.
|
||||
// This is a control command.
|
||||
func (c control) Stop() {
|
||||
c.SyncUpdates()
|
||||
c <- controlStop{}
|
||||
}
|
||||
|
||||
// Clears the cache
|
||||
// This is a control command.
|
||||
func (c control) Clear() {
|
||||
done := make(chan struct{})
|
||||
c <- controlClear{done: done}
|
||||
<-done
|
||||
}
|
||||
|
||||
// Gets the size of the cache. This is an O(1) call to make, but it is handled
|
||||
// by the worker goroutine. It's meant to be called periodically for metrics, or
|
||||
// from tests.
|
||||
// This is a control command.
|
||||
func (c control) GetSize() int64 {
|
||||
res := make(chan int64)
|
||||
c <- controlGetSize{res: res}
|
||||
return <-res
|
||||
}
|
||||
|
||||
// Gets the number of items removed from the cache due to memory pressure since
|
||||
// the last time GetDropped was called
|
||||
// This is a control command.
|
||||
func (c control) GetDropped() int {
|
||||
res := make(chan int)
|
||||
c <- controlGetDropped{res: res}
|
||||
return <-res
|
||||
}
|
||||
|
||||
// Sets a new max size. That can result in a GC being run if the new maxium size
|
||||
// is smaller than the cached size
|
||||
// This is a control command.
|
||||
func (c control) SetMaxSize(size int64) {
|
||||
done := make(chan struct{})
|
||||
c <- controlSetMaxSize{size: size, done: done}
|
||||
<-done
|
||||
}
|
||||
|
||||
// SyncUpdates waits until the cache has finished asynchronous state updates for any operations
|
||||
// that were done by the current goroutine up to now.
|
||||
//
|
||||
// For efficiency, the cache's implementation of LRU behavior is partly managed by a worker
|
||||
// goroutine that updates its internal data structures asynchronously. This means that the
|
||||
// cache's state in terms of (for instance) eviction of LRU items is only eventually consistent;
|
||||
// there is no guarantee that it happens before a Get or Set call has returned. Most of the time
|
||||
// application code will not care about this, but especially in a test scenario you may want to
|
||||
// be able to know when the worker has caught up.
|
||||
//
|
||||
// This applies only to cache methods that were previously called by the same goroutine that is
|
||||
// now calling SyncUpdates. If other goroutines are using the cache at the same time, there is
|
||||
// no way to know whether any of them still have pending state updates when SyncUpdates returns.
|
||||
// This is a control command.
|
||||
func (c control) SyncUpdates() {
|
||||
done := make(chan struct{})
|
||||
c <- controlSyncUpdates{done: done}
|
||||
<-done
|
||||
}
|
141
layeredcache.go
141
layeredcache.go
@@ -9,13 +9,14 @@ import (
|
||||
|
||||
type LayeredCache[T any] struct {
|
||||
*Configuration[T]
|
||||
control
|
||||
list *List[*Item[T]]
|
||||
buckets []*layeredBucket[T]
|
||||
bucketMask uint32
|
||||
size int64
|
||||
deletables chan *Item[T]
|
||||
promotables chan *Item[T]
|
||||
control chan interface{}
|
||||
setables chan *Item[T]
|
||||
}
|
||||
|
||||
// Create a new layered cache with the specified configuration.
|
||||
@@ -35,17 +36,19 @@ func Layered[T any](config *Configuration[T]) *LayeredCache[T] {
|
||||
c := &LayeredCache[T]{
|
||||
list: NewList[*Item[T]](),
|
||||
Configuration: config,
|
||||
control: newControl(),
|
||||
bucketMask: uint32(config.buckets) - 1,
|
||||
buckets: make([]*layeredBucket[T], config.buckets),
|
||||
deletables: make(chan *Item[T], config.deleteBuffer),
|
||||
control: make(chan interface{}),
|
||||
promotables: make(chan *Item[T], config.promoteBuffer),
|
||||
setables: make(chan *Item[T], config.setableBuffer),
|
||||
}
|
||||
for i := 0; i < int(config.buckets); i++ {
|
||||
c.buckets[i] = &layeredBucket[T]{
|
||||
buckets: make(map[string]*bucket[T]),
|
||||
}
|
||||
}
|
||||
c.restart()
|
||||
go c.worker()
|
||||
return c
|
||||
}
|
||||
|
||||
@@ -180,69 +183,12 @@ func (c *LayeredCache[T]) DeleteFunc(primary string, matches func(key string, it
|
||||
return c.bucket(primary).deleteFunc(primary, matches, c.deletables)
|
||||
}
|
||||
|
||||
// Clears the cache
|
||||
func (c *LayeredCache[T]) Clear() {
|
||||
done := make(chan struct{})
|
||||
c.control <- clear{done: done}
|
||||
<-done
|
||||
}
|
||||
|
||||
func (c *LayeredCache[T]) Stop() {
|
||||
close(c.promotables)
|
||||
<-c.control
|
||||
}
|
||||
|
||||
// Gets the number of items removed from the cache due to memory pressure since
|
||||
// the last time GetDropped was called
|
||||
func (c *LayeredCache[T]) GetDropped() int {
|
||||
return doGetDropped(c.control)
|
||||
}
|
||||
|
||||
// SyncUpdates waits until the cache has finished asynchronous state updates for any operations
|
||||
// that were done by the current goroutine up to now. See Cache.SyncUpdates for details.
|
||||
func (c *LayeredCache[T]) SyncUpdates() {
|
||||
doSyncUpdates(c.control)
|
||||
}
|
||||
|
||||
// Sets a new max size. That can result in a GC being run if the new maxium size
|
||||
// is smaller than the cached size
|
||||
func (c *LayeredCache[T]) SetMaxSize(size int64) {
|
||||
done := make(chan struct{})
|
||||
c.control <- setMaxSize{size: size, done: done}
|
||||
<-done
|
||||
}
|
||||
|
||||
// Forces GC. There should be no reason to call this function, except from tests
|
||||
// which require synchronous GC.
|
||||
// This is a control command.
|
||||
func (c *LayeredCache[T]) GC() {
|
||||
done := make(chan struct{})
|
||||
c.control <- gc{done: done}
|
||||
<-done
|
||||
}
|
||||
|
||||
// Gets the size of the cache. This is an O(1) call to make, but it is handled
|
||||
// by the worker goroutine. It's meant to be called periodically for metrics, or
|
||||
// from tests.
|
||||
// This is a control command.
|
||||
func (c *LayeredCache[T]) GetSize() int64 {
|
||||
res := make(chan int64)
|
||||
c.control <- getSize{res}
|
||||
return <-res
|
||||
}
|
||||
|
||||
func (c *LayeredCache[T]) restart() {
|
||||
c.promotables = make(chan *Item[T], c.promoteBuffer)
|
||||
c.control = make(chan interface{})
|
||||
go c.worker()
|
||||
}
|
||||
|
||||
func (c *LayeredCache[T]) set(primary, secondary string, value T, duration time.Duration, track bool) *Item[T] {
|
||||
item, existing := c.bucket(primary).set(primary, secondary, value, duration, track)
|
||||
if existing != nil {
|
||||
c.deletables <- existing
|
||||
}
|
||||
c.promote(item)
|
||||
c.setables <- item
|
||||
return item
|
||||
}
|
||||
|
||||
@@ -252,70 +198,79 @@ func (c *LayeredCache[T]) bucket(key string) *layeredBucket[T] {
|
||||
return c.buckets[h.Sum32()&c.bucketMask]
|
||||
}
|
||||
|
||||
func (c *LayeredCache[T]) promote(item *Item[T]) {
|
||||
c.promotables <- item
|
||||
}
|
||||
|
||||
func (c *LayeredCache[T]) worker() {
|
||||
defer close(c.control)
|
||||
dropped := 0
|
||||
cc := c.control
|
||||
|
||||
promoteItem := func(item *Item[T]) {
|
||||
if c.doPromote(item) && c.size > c.maxSize {
|
||||
dropped += c.gc()
|
||||
}
|
||||
}
|
||||
deleteItem := func(item *Item[T]) {
|
||||
if item.node == nil {
|
||||
item.promotions = -2
|
||||
} else {
|
||||
c.size -= item.size
|
||||
if c.onDelete != nil {
|
||||
c.onDelete(item)
|
||||
}
|
||||
c.list.Remove(item.node)
|
||||
item.node = nil
|
||||
item.promotions = -2
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case item, ok := <-c.promotables:
|
||||
if ok == false {
|
||||
return
|
||||
}
|
||||
case item := <-c.promotables:
|
||||
promoteItem(item)
|
||||
case item := <-c.setables:
|
||||
promoteItem(item)
|
||||
case item := <-c.deletables:
|
||||
deleteItem(item)
|
||||
case control := <-c.control:
|
||||
c.doDelete(item)
|
||||
case control := <-cc:
|
||||
switch msg := control.(type) {
|
||||
case getDropped:
|
||||
case controlStop:
|
||||
goto drain
|
||||
case controlGetDropped:
|
||||
msg.res <- dropped
|
||||
dropped = 0
|
||||
case setMaxSize:
|
||||
case controlSetMaxSize:
|
||||
c.maxSize = msg.size
|
||||
if c.size > c.maxSize {
|
||||
dropped += c.gc()
|
||||
}
|
||||
msg.done <- struct{}{}
|
||||
case clear:
|
||||
case controlClear:
|
||||
for _, bucket := range c.buckets {
|
||||
bucket.clear()
|
||||
}
|
||||
c.size = 0
|
||||
c.list = NewList[*Item[T]]()
|
||||
msg.done <- struct{}{}
|
||||
case getSize:
|
||||
case controlGetSize:
|
||||
msg.res <- c.size
|
||||
case gc:
|
||||
case controlGC:
|
||||
dropped += c.gc()
|
||||
msg.done <- struct{}{}
|
||||
case syncWorker:
|
||||
doAllPendingPromotesAndDeletes(c.promotables, promoteItem,
|
||||
c.deletables, deleteItem)
|
||||
case controlSyncUpdates:
|
||||
doAllPendingPromotesAndDeletes(c.promotables, c.promotables, promoteItem, c.deletables, c.doDelete)
|
||||
msg.done <- struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
drain:
|
||||
for {
|
||||
select {
|
||||
case item := <-c.deletables:
|
||||
c.doDelete(item)
|
||||
default:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *LayeredCache[T]) doDelete(item *Item[T]) {
|
||||
if item.node == nil {
|
||||
item.promotions = -2
|
||||
} else {
|
||||
c.size -= item.size
|
||||
if c.onDelete != nil {
|
||||
c.onDelete(item)
|
||||
}
|
||||
c.list.Remove(item.node)
|
||||
item.node = nil
|
||||
item.promotions = -2
|
||||
}
|
||||
}
|
||||
|
||||
func (c *LayeredCache[T]) doPromote(item *Item[T]) bool {
|
||||
|
@@ -396,6 +396,29 @@ func Test_LayeredCachePrune(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func Test_LayeredConcurrentStop(t *testing.T) {
|
||||
for i := 0; i < 100; i++ {
|
||||
cache := Layered(Configure[string]())
|
||||
r := func() {
|
||||
for {
|
||||
key := strconv.Itoa(int(rand.Int31n(100)))
|
||||
switch rand.Int31n(3) {
|
||||
case 0:
|
||||
cache.Get(key, key)
|
||||
case 1:
|
||||
cache.Set(key, key, key, time.Minute)
|
||||
case 2:
|
||||
cache.Delete(key, key)
|
||||
}
|
||||
}
|
||||
}
|
||||
go r()
|
||||
go r()
|
||||
go r()
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
cache.Stop()
|
||||
}
|
||||
}
|
||||
func newLayered[T any]() *LayeredCache[T] {
|
||||
c := Layered[T](Configure[T]())
|
||||
c.Clear()
|
||||
|
@@ -37,9 +37,9 @@ var cache = ccache.New(ccache.Configure[int]().MaxSize(1000).ItemsToPrune(100))
|
||||
|
||||
The most likely configuration options to tweak are:
|
||||
|
||||
* `MaxSize(int)` - the maximum number size to store in the cache (default: 5000)
|
||||
* `MaxSize(int)` - the maximum size to store in the cache (default: 5000)
|
||||
* `GetsPerPromote(int)` - the number of times an item is fetched before we promote it. For large caches with long TTLs, it normally isn't necessary to promote an item after every fetch (default: 3)
|
||||
* `ItemsToPrune(int)` - the number of items to prune when we hit `MaxSize`. Freeing up more than 1 slot at a time improved performance (default: 500)
|
||||
* `ItemsToPrune(int)` - the number of items to prune when we hit `MaxSize`. Freeing up more than 1 slot at a time improves performance (default: 500)
|
||||
|
||||
Configurations that change the internals of the cache, which aren't as likely to need tweaking:
|
||||
|
||||
@@ -47,6 +47,9 @@ Configurations that change the internals of the cache, which aren't as likely to
|
||||
* `PromoteBuffer(int)` - the size of the buffer to use to queue promotions (default: 1024)
|
||||
* `DeleteBuffer(int)` the size of the buffer to use to queue deletions (default: 1024)
|
||||
|
||||
## MaxSize Soft vs Hard Limit
|
||||
By default, MaxSize is a soft limit. This is a result of the fact that cache pruning happens asychronously in a separate "worker" goroutine. The hard limit is MaxSize + SetableBufferSize + 1. The SettableBufferSize can be configured via the `SetableBuffer(INT)` configuration function. It defaults to 256.
|
||||
|
||||
## Usage
|
||||
|
||||
Once the cache is setup, you can `Get`, `Set` and `Delete` items from it. A `Get` returns an `*Item`:
|
||||
|
@@ -20,7 +20,7 @@ func (s *SecondaryCache[T]) Set(secondary string, value T, duration time.Duratio
|
||||
if existing != nil {
|
||||
s.pCache.deletables <- existing
|
||||
}
|
||||
s.pCache.promote(item)
|
||||
s.pCache.promotables <- item
|
||||
return item
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user