Refactor control messages + Stop handling
Move the control API shared between Cache and LayeredCache into its own struct. But keep the control logic handling separate - it requires access to the local values, like dropped and deleteItem. Stop is now a control message. Channels are no longer closed as part of the stop process.
This commit is contained in:
127
cache.go
127
cache.go
@@ -36,13 +36,13 @@ type gc struct {
|
|||||||
|
|
||||||
type Cache[T any] struct {
|
type Cache[T any] struct {
|
||||||
*Configuration[T]
|
*Configuration[T]
|
||||||
|
control
|
||||||
list *List[*Item[T]]
|
list *List[*Item[T]]
|
||||||
size int64
|
size int64
|
||||||
buckets []*bucket[T]
|
buckets []*bucket[T]
|
||||||
bucketMask uint32
|
bucketMask uint32
|
||||||
deletables chan *Item[T]
|
deletables chan *Item[T]
|
||||||
promotables chan *Item[T]
|
promotables chan *Item[T]
|
||||||
control chan interface{}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a new cache with the specified configuration
|
// Create a new cache with the specified configuration
|
||||||
@@ -51,16 +51,18 @@ func New[T any](config *Configuration[T]) *Cache[T] {
|
|||||||
c := &Cache[T]{
|
c := &Cache[T]{
|
||||||
list: NewList[*Item[T]](),
|
list: NewList[*Item[T]](),
|
||||||
Configuration: config,
|
Configuration: config,
|
||||||
|
control: newControl(),
|
||||||
bucketMask: uint32(config.buckets) - 1,
|
bucketMask: uint32(config.buckets) - 1,
|
||||||
buckets: make([]*bucket[T], config.buckets),
|
buckets: make([]*bucket[T], config.buckets),
|
||||||
control: make(chan interface{}),
|
deletables: make(chan *Item[T], config.deleteBuffer),
|
||||||
|
promotables: make(chan *Item[T], config.promoteBuffer),
|
||||||
}
|
}
|
||||||
for i := 0; i < config.buckets; i++ {
|
for i := 0; i < config.buckets; i++ {
|
||||||
c.buckets[i] = &bucket[T]{
|
c.buckets[i] = &bucket[T]{
|
||||||
lookup: make(map[string]*Item[T]),
|
lookup: make(map[string]*Item[T]),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
c.restart()
|
go c.worker()
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -184,94 +186,6 @@ func (c *Cache[T]) Delete(key string) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clears the cache
|
|
||||||
// This is a control command.
|
|
||||||
func (c *Cache[T]) Clear() {
|
|
||||||
done := make(chan struct{})
|
|
||||||
c.control <- clear{done: done}
|
|
||||||
<-done
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stops the background worker. Operations performed on the cache after Stop
|
|
||||||
// is called are likely to panic
|
|
||||||
// This is a control command.
|
|
||||||
func (c *Cache[T]) Stop() {
|
|
||||||
close(c.promotables)
|
|
||||||
<-c.control
|
|
||||||
}
|
|
||||||
|
|
||||||
// Gets the number of items removed from the cache due to memory pressure since
|
|
||||||
// the last time GetDropped was called
|
|
||||||
// This is a control command.
|
|
||||||
func (c *Cache[T]) GetDropped() int {
|
|
||||||
return doGetDropped(c.control)
|
|
||||||
}
|
|
||||||
|
|
||||||
func doGetDropped(controlCh chan<- interface{}) int {
|
|
||||||
res := make(chan int)
|
|
||||||
controlCh <- getDropped{res: res}
|
|
||||||
return <-res
|
|
||||||
}
|
|
||||||
|
|
||||||
// SyncUpdates waits until the cache has finished asynchronous state updates for any operations
|
|
||||||
// that were done by the current goroutine up to now.
|
|
||||||
//
|
|
||||||
// For efficiency, the cache's implementation of LRU behavior is partly managed by a worker
|
|
||||||
// goroutine that updates its internal data structures asynchronously. This means that the
|
|
||||||
// cache's state in terms of (for instance) eviction of LRU items is only eventually consistent;
|
|
||||||
// there is no guarantee that it happens before a Get or Set call has returned. Most of the time
|
|
||||||
// application code will not care about this, but especially in a test scenario you may want to
|
|
||||||
// be able to know when the worker has caught up.
|
|
||||||
//
|
|
||||||
// This applies only to cache methods that were previously called by the same goroutine that is
|
|
||||||
// now calling SyncUpdates. If other goroutines are using the cache at the same time, there is
|
|
||||||
// no way to know whether any of them still have pending state updates when SyncUpdates returns.
|
|
||||||
// This is a control command.
|
|
||||||
func (c *Cache[T]) SyncUpdates() {
|
|
||||||
doSyncUpdates(c.control)
|
|
||||||
}
|
|
||||||
|
|
||||||
func doSyncUpdates(controlCh chan<- interface{}) {
|
|
||||||
done := make(chan struct{})
|
|
||||||
controlCh <- syncWorker{done: done}
|
|
||||||
<-done
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sets a new max size. That can result in a GC being run if the new maxium size
|
|
||||||
// is smaller than the cached size
|
|
||||||
// This is a control command.
|
|
||||||
func (c *Cache[T]) SetMaxSize(size int64) {
|
|
||||||
done := make(chan struct{})
|
|
||||||
c.control <- setMaxSize{size: size, done: done}
|
|
||||||
<-done
|
|
||||||
}
|
|
||||||
|
|
||||||
// Forces GC. There should be no reason to call this function, except from tests
|
|
||||||
// which require synchronous GC.
|
|
||||||
// This is a control command.
|
|
||||||
func (c *Cache[T]) GC() {
|
|
||||||
done := make(chan struct{})
|
|
||||||
c.control <- gc{done: done}
|
|
||||||
<-done
|
|
||||||
}
|
|
||||||
|
|
||||||
// Gets the size of the cache. This is an O(1) call to make, but it is handled
|
|
||||||
// by the worker goroutine. It's meant to be called periodically for metrics, or
|
|
||||||
// from tests.
|
|
||||||
// This is a control command.
|
|
||||||
func (c *Cache[T]) GetSize() int64 {
|
|
||||||
res := make(chan int64)
|
|
||||||
c.control <- getSize{res}
|
|
||||||
return <-res
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Cache[T]) restart() {
|
|
||||||
c.deletables = make(chan *Item[T], c.deleteBuffer)
|
|
||||||
c.promotables = make(chan *Item[T], c.promoteBuffer)
|
|
||||||
c.control = make(chan interface{})
|
|
||||||
go c.worker()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Cache[T]) deleteItem(bucket *bucket[T], item *Item[T]) {
|
func (c *Cache[T]) deleteItem(bucket *bucket[T], item *Item[T]) {
|
||||||
bucket.delete(item.key) //stop other GETs from getting it
|
bucket.delete(item.key) //stop other GETs from getting it
|
||||||
c.deletables <- item
|
c.deletables <- item
|
||||||
@@ -293,48 +207,48 @@ func (c *Cache[T]) bucket(key string) *bucket[T] {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cache[T]) worker() {
|
func (c *Cache[T]) worker() {
|
||||||
defer close(c.control)
|
|
||||||
dropped := 0
|
dropped := 0
|
||||||
|
cc := c.control
|
||||||
|
|
||||||
promoteItem := func(item *Item[T]) {
|
promoteItem := func(item *Item[T]) {
|
||||||
if c.doPromote(item) && c.size > c.maxSize {
|
if c.doPromote(item) && c.size > c.maxSize {
|
||||||
dropped += c.gc()
|
dropped += c.gc()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case item, ok := <-c.promotables:
|
case item := <-c.promotables:
|
||||||
if ok == false {
|
|
||||||
goto drain
|
|
||||||
}
|
|
||||||
promoteItem(item)
|
promoteItem(item)
|
||||||
case item := <-c.deletables:
|
case item := <-c.deletables:
|
||||||
c.doDelete(item)
|
c.doDelete(item)
|
||||||
case control := <-c.control:
|
case control := <-cc:
|
||||||
switch msg := control.(type) {
|
switch msg := control.(type) {
|
||||||
case getDropped:
|
case controlStop:
|
||||||
|
goto drain
|
||||||
|
case controlGetDropped:
|
||||||
msg.res <- dropped
|
msg.res <- dropped
|
||||||
dropped = 0
|
dropped = 0
|
||||||
case setMaxSize:
|
case controlSetMaxSize:
|
||||||
c.maxSize = msg.size
|
c.maxSize = msg.size
|
||||||
if c.size > c.maxSize {
|
if c.size > c.maxSize {
|
||||||
dropped += c.gc()
|
dropped += c.gc()
|
||||||
}
|
}
|
||||||
msg.done <- struct{}{}
|
msg.done <- struct{}{}
|
||||||
case clear:
|
case controlClear:
|
||||||
for _, bucket := range c.buckets {
|
for _, bucket := range c.buckets {
|
||||||
bucket.clear()
|
bucket.clear()
|
||||||
}
|
}
|
||||||
c.size = 0
|
c.size = 0
|
||||||
c.list = NewList[*Item[T]]()
|
c.list = NewList[*Item[T]]()
|
||||||
msg.done <- struct{}{}
|
msg.done <- struct{}{}
|
||||||
case getSize:
|
case controlGetSize:
|
||||||
msg.res <- c.size
|
msg.res <- c.size
|
||||||
case gc:
|
case controlGC:
|
||||||
dropped += c.gc()
|
dropped += c.gc()
|
||||||
msg.done <- struct{}{}
|
msg.done <- struct{}{}
|
||||||
case syncWorker:
|
case controlSyncUpdates:
|
||||||
doAllPendingPromotesAndDeletes(c.promotables, promoteItem,
|
doAllPendingPromotesAndDeletes(c.promotables, promoteItem, c.deletables, c.doDelete)
|
||||||
c.deletables, c.doDelete)
|
|
||||||
msg.done <- struct{}{}
|
msg.done <- struct{}{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -346,7 +260,6 @@ drain:
|
|||||||
case item := <-c.deletables:
|
case item := <-c.deletables:
|
||||||
c.doDelete(item)
|
c.doDelete(item)
|
||||||
default:
|
default:
|
||||||
close(c.deletables)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -367,9 +280,7 @@ doAllPromotes:
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case item := <-promotables:
|
case item := <-promotables:
|
||||||
if item != nil {
|
|
||||||
promoteFn(item)
|
promoteFn(item)
|
||||||
}
|
|
||||||
default:
|
default:
|
||||||
break doAllPromotes
|
break doAllPromotes
|
||||||
}
|
}
|
||||||
|
@@ -337,6 +337,30 @@ func Test_CachePrune(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Test_ConcurrentStop(t *testing.T) {
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
cache := New(Configure[string]())
|
||||||
|
r := func() {
|
||||||
|
for {
|
||||||
|
key := strconv.Itoa(int(rand.Int31n(100)))
|
||||||
|
switch rand.Int31n(3) {
|
||||||
|
case 0:
|
||||||
|
cache.Get(key)
|
||||||
|
case 1:
|
||||||
|
cache.Set(key, key, time.Minute)
|
||||||
|
case 2:
|
||||||
|
cache.Delete(key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
go r()
|
||||||
|
go r()
|
||||||
|
go r()
|
||||||
|
time.Sleep(time.Millisecond * 10)
|
||||||
|
cache.Stop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type SizedItem struct {
|
type SizedItem struct {
|
||||||
id int
|
id int
|
||||||
s int64
|
s int64
|
||||||
|
110
control.go
Normal file
110
control.go
Normal file
@@ -0,0 +1,110 @@
|
|||||||
|
package ccache
|
||||||
|
|
||||||
|
type controlGC struct {
|
||||||
|
done chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
type controlClear struct {
|
||||||
|
done chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
type controlStop struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
type controlGetSize struct {
|
||||||
|
res chan int64
|
||||||
|
}
|
||||||
|
|
||||||
|
type controlGetDropped struct {
|
||||||
|
res chan int
|
||||||
|
}
|
||||||
|
|
||||||
|
type controlSetMaxSize struct {
|
||||||
|
size int64
|
||||||
|
done chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
type controlSyncUpdates struct {
|
||||||
|
done chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
type control chan interface{}
|
||||||
|
|
||||||
|
func newControl() chan interface{} {
|
||||||
|
return make(chan interface{}, 5)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Forces GC. There should be no reason to call this function, except from tests
|
||||||
|
// which require synchronous GC.
|
||||||
|
// This is a control command.
|
||||||
|
func (c control) GC() {
|
||||||
|
done := make(chan struct{})
|
||||||
|
c <- controlGC{done: done}
|
||||||
|
<-done
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sends a stop signal to the worker thread. The worker thread will shut down
|
||||||
|
// 5 seconds after the last message is received. The cache should not be used
|
||||||
|
// after Stop is called, but concurrently executing requests should properly finish
|
||||||
|
// executing.
|
||||||
|
// This is a control command.
|
||||||
|
func (c control) Stop() {
|
||||||
|
c.SyncUpdates()
|
||||||
|
c <- controlStop{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clears the cache
|
||||||
|
// This is a control command.
|
||||||
|
func (c control) Clear() {
|
||||||
|
done := make(chan struct{})
|
||||||
|
c <- controlClear{done: done}
|
||||||
|
<-done
|
||||||
|
}
|
||||||
|
|
||||||
|
// Gets the size of the cache. This is an O(1) call to make, but it is handled
|
||||||
|
// by the worker goroutine. It's meant to be called periodically for metrics, or
|
||||||
|
// from tests.
|
||||||
|
// This is a control command.
|
||||||
|
func (c control) GetSize() int64 {
|
||||||
|
res := make(chan int64)
|
||||||
|
c <- controlGetSize{res: res}
|
||||||
|
return <-res
|
||||||
|
}
|
||||||
|
|
||||||
|
// Gets the number of items removed from the cache due to memory pressure since
|
||||||
|
// the last time GetDropped was called
|
||||||
|
// This is a control command.
|
||||||
|
func (c control) GetDropped() int {
|
||||||
|
res := make(chan int)
|
||||||
|
c <- controlGetDropped{res: res}
|
||||||
|
return <-res
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sets a new max size. That can result in a GC being run if the new maxium size
|
||||||
|
// is smaller than the cached size
|
||||||
|
// This is a control command.
|
||||||
|
func (c control) SetMaxSize(size int64) {
|
||||||
|
done := make(chan struct{})
|
||||||
|
c <- controlSetMaxSize{size: size, done: done}
|
||||||
|
<-done
|
||||||
|
}
|
||||||
|
|
||||||
|
// SyncUpdates waits until the cache has finished asynchronous state updates for any operations
|
||||||
|
// that were done by the current goroutine up to now.
|
||||||
|
//
|
||||||
|
// For efficiency, the cache's implementation of LRU behavior is partly managed by a worker
|
||||||
|
// goroutine that updates its internal data structures asynchronously. This means that the
|
||||||
|
// cache's state in terms of (for instance) eviction of LRU items is only eventually consistent;
|
||||||
|
// there is no guarantee that it happens before a Get or Set call has returned. Most of the time
|
||||||
|
// application code will not care about this, but especially in a test scenario you may want to
|
||||||
|
// be able to know when the worker has caught up.
|
||||||
|
//
|
||||||
|
// This applies only to cache methods that were previously called by the same goroutine that is
|
||||||
|
// now calling SyncUpdates. If other goroutines are using the cache at the same time, there is
|
||||||
|
// no way to know whether any of them still have pending state updates when SyncUpdates returns.
|
||||||
|
// This is a control command.
|
||||||
|
func (c control) SyncUpdates() {
|
||||||
|
done := make(chan struct{})
|
||||||
|
c <- controlSyncUpdates{done: done}
|
||||||
|
<-done
|
||||||
|
}
|
159
layeredcache.go
159
layeredcache.go
@@ -9,13 +9,13 @@ import (
|
|||||||
|
|
||||||
type LayeredCache[T any] struct {
|
type LayeredCache[T any] struct {
|
||||||
*Configuration[T]
|
*Configuration[T]
|
||||||
|
control
|
||||||
list *List[*Item[T]]
|
list *List[*Item[T]]
|
||||||
buckets []*layeredBucket[T]
|
buckets []*layeredBucket[T]
|
||||||
bucketMask uint32
|
bucketMask uint32
|
||||||
size int64
|
size int64
|
||||||
deletables chan *Item[T]
|
deletables chan *Item[T]
|
||||||
promotables chan *Item[T]
|
promotables chan *Item[T]
|
||||||
control chan interface{}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a new layered cache with the specified configuration.
|
// Create a new layered cache with the specified configuration.
|
||||||
@@ -35,17 +35,18 @@ func Layered[T any](config *Configuration[T]) *LayeredCache[T] {
|
|||||||
c := &LayeredCache[T]{
|
c := &LayeredCache[T]{
|
||||||
list: NewList[*Item[T]](),
|
list: NewList[*Item[T]](),
|
||||||
Configuration: config,
|
Configuration: config,
|
||||||
|
control: newControl(),
|
||||||
bucketMask: uint32(config.buckets) - 1,
|
bucketMask: uint32(config.buckets) - 1,
|
||||||
buckets: make([]*layeredBucket[T], config.buckets),
|
buckets: make([]*layeredBucket[T], config.buckets),
|
||||||
deletables: make(chan *Item[T], config.deleteBuffer),
|
deletables: make(chan *Item[T], config.deleteBuffer),
|
||||||
control: make(chan interface{}),
|
promotables: make(chan *Item[T], config.promoteBuffer),
|
||||||
}
|
}
|
||||||
for i := 0; i < int(config.buckets); i++ {
|
for i := 0; i < int(config.buckets); i++ {
|
||||||
c.buckets[i] = &layeredBucket[T]{
|
c.buckets[i] = &layeredBucket[T]{
|
||||||
buckets: make(map[string]*bucket[T]),
|
buckets: make(map[string]*bucket[T]),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
c.restart()
|
go c.worker()
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -180,63 +181,6 @@ func (c *LayeredCache[T]) DeleteFunc(primary string, matches func(key string, it
|
|||||||
return c.bucket(primary).deleteFunc(primary, matches, c.deletables)
|
return c.bucket(primary).deleteFunc(primary, matches, c.deletables)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clears the cache
|
|
||||||
func (c *LayeredCache[T]) Clear() {
|
|
||||||
done := make(chan struct{})
|
|
||||||
c.control <- clear{done: done}
|
|
||||||
<-done
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *LayeredCache[T]) Stop() {
|
|
||||||
close(c.promotables)
|
|
||||||
<-c.control
|
|
||||||
}
|
|
||||||
|
|
||||||
// Gets the number of items removed from the cache due to memory pressure since
|
|
||||||
// the last time GetDropped was called
|
|
||||||
func (c *LayeredCache[T]) GetDropped() int {
|
|
||||||
return doGetDropped(c.control)
|
|
||||||
}
|
|
||||||
|
|
||||||
// SyncUpdates waits until the cache has finished asynchronous state updates for any operations
|
|
||||||
// that were done by the current goroutine up to now. See Cache.SyncUpdates for details.
|
|
||||||
func (c *LayeredCache[T]) SyncUpdates() {
|
|
||||||
doSyncUpdates(c.control)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sets a new max size. That can result in a GC being run if the new maxium size
|
|
||||||
// is smaller than the cached size
|
|
||||||
func (c *LayeredCache[T]) SetMaxSize(size int64) {
|
|
||||||
done := make(chan struct{})
|
|
||||||
c.control <- setMaxSize{size: size, done: done}
|
|
||||||
<-done
|
|
||||||
}
|
|
||||||
|
|
||||||
// Forces GC. There should be no reason to call this function, except from tests
|
|
||||||
// which require synchronous GC.
|
|
||||||
// This is a control command.
|
|
||||||
func (c *LayeredCache[T]) GC() {
|
|
||||||
done := make(chan struct{})
|
|
||||||
c.control <- gc{done: done}
|
|
||||||
<-done
|
|
||||||
}
|
|
||||||
|
|
||||||
// Gets the size of the cache. This is an O(1) call to make, but it is handled
|
|
||||||
// by the worker goroutine. It's meant to be called periodically for metrics, or
|
|
||||||
// from tests.
|
|
||||||
// This is a control command.
|
|
||||||
func (c *LayeredCache[T]) GetSize() int64 {
|
|
||||||
res := make(chan int64)
|
|
||||||
c.control <- getSize{res}
|
|
||||||
return <-res
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *LayeredCache[T]) restart() {
|
|
||||||
c.promotables = make(chan *Item[T], c.promoteBuffer)
|
|
||||||
c.control = make(chan interface{})
|
|
||||||
go c.worker()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *LayeredCache[T]) set(primary, secondary string, value T, duration time.Duration, track bool) *Item[T] {
|
func (c *LayeredCache[T]) set(primary, secondary string, value T, duration time.Duration, track bool) *Item[T] {
|
||||||
item, existing := c.bucket(primary).set(primary, secondary, value, duration, track)
|
item, existing := c.bucket(primary).set(primary, secondary, value, duration, track)
|
||||||
if existing != nil {
|
if existing != nil {
|
||||||
@@ -257,14 +201,65 @@ func (c *LayeredCache[T]) promote(item *Item[T]) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *LayeredCache[T]) worker() {
|
func (c *LayeredCache[T]) worker() {
|
||||||
defer close(c.control)
|
|
||||||
dropped := 0
|
dropped := 0
|
||||||
|
cc := c.control
|
||||||
|
|
||||||
promoteItem := func(item *Item[T]) {
|
promoteItem := func(item *Item[T]) {
|
||||||
if c.doPromote(item) && c.size > c.maxSize {
|
if c.doPromote(item) && c.size > c.maxSize {
|
||||||
dropped += c.gc()
|
dropped += c.gc()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
deleteItem := func(item *Item[T]) {
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case item := <-c.promotables:
|
||||||
|
promoteItem(item)
|
||||||
|
case item := <-c.deletables:
|
||||||
|
c.doDelete(item)
|
||||||
|
case control := <-cc:
|
||||||
|
switch msg := control.(type) {
|
||||||
|
case controlStop:
|
||||||
|
goto drain
|
||||||
|
case controlGetDropped:
|
||||||
|
msg.res <- dropped
|
||||||
|
dropped = 0
|
||||||
|
case controlSetMaxSize:
|
||||||
|
c.maxSize = msg.size
|
||||||
|
if c.size > c.maxSize {
|
||||||
|
dropped += c.gc()
|
||||||
|
}
|
||||||
|
msg.done <- struct{}{}
|
||||||
|
case controlClear:
|
||||||
|
for _, bucket := range c.buckets {
|
||||||
|
bucket.clear()
|
||||||
|
}
|
||||||
|
c.size = 0
|
||||||
|
c.list = NewList[*Item[T]]()
|
||||||
|
msg.done <- struct{}{}
|
||||||
|
case controlGetSize:
|
||||||
|
msg.res <- c.size
|
||||||
|
case controlGC:
|
||||||
|
dropped += c.gc()
|
||||||
|
msg.done <- struct{}{}
|
||||||
|
case controlSyncUpdates:
|
||||||
|
doAllPendingPromotesAndDeletes(c.promotables, promoteItem, c.deletables, c.doDelete)
|
||||||
|
msg.done <- struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
drain:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case item := <-c.deletables:
|
||||||
|
c.doDelete(item)
|
||||||
|
default:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *LayeredCache[T]) doDelete(item *Item[T]) {
|
||||||
if item.node == nil {
|
if item.node == nil {
|
||||||
item.promotions = -2
|
item.promotions = -2
|
||||||
} else {
|
} else {
|
||||||
@@ -277,46 +272,6 @@ func (c *LayeredCache[T]) worker() {
|
|||||||
item.promotions = -2
|
item.promotions = -2
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case item, ok := <-c.promotables:
|
|
||||||
if ok == false {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
promoteItem(item)
|
|
||||||
case item := <-c.deletables:
|
|
||||||
deleteItem(item)
|
|
||||||
case control := <-c.control:
|
|
||||||
switch msg := control.(type) {
|
|
||||||
case getDropped:
|
|
||||||
msg.res <- dropped
|
|
||||||
dropped = 0
|
|
||||||
case setMaxSize:
|
|
||||||
c.maxSize = msg.size
|
|
||||||
if c.size > c.maxSize {
|
|
||||||
dropped += c.gc()
|
|
||||||
}
|
|
||||||
msg.done <- struct{}{}
|
|
||||||
case clear:
|
|
||||||
for _, bucket := range c.buckets {
|
|
||||||
bucket.clear()
|
|
||||||
}
|
|
||||||
c.size = 0
|
|
||||||
c.list = NewList[*Item[T]]()
|
|
||||||
msg.done <- struct{}{}
|
|
||||||
case getSize:
|
|
||||||
msg.res <- c.size
|
|
||||||
case gc:
|
|
||||||
dropped += c.gc()
|
|
||||||
msg.done <- struct{}{}
|
|
||||||
case syncWorker:
|
|
||||||
doAllPendingPromotesAndDeletes(c.promotables, promoteItem,
|
|
||||||
c.deletables, deleteItem)
|
|
||||||
msg.done <- struct{}{}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *LayeredCache[T]) doPromote(item *Item[T]) bool {
|
func (c *LayeredCache[T]) doPromote(item *Item[T]) bool {
|
||||||
// deleted before it ever got promoted
|
// deleted before it ever got promoted
|
||||||
|
@@ -396,6 +396,29 @@ func Test_LayeredCachePrune(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Test_LayeredConcurrentStop(t *testing.T) {
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
cache := Layered(Configure[string]())
|
||||||
|
r := func() {
|
||||||
|
for {
|
||||||
|
key := strconv.Itoa(int(rand.Int31n(100)))
|
||||||
|
switch rand.Int31n(3) {
|
||||||
|
case 0:
|
||||||
|
cache.Get(key, key)
|
||||||
|
case 1:
|
||||||
|
cache.Set(key, key, key, time.Minute)
|
||||||
|
case 2:
|
||||||
|
cache.Delete(key, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
go r()
|
||||||
|
go r()
|
||||||
|
go r()
|
||||||
|
time.Sleep(time.Millisecond * 10)
|
||||||
|
cache.Stop()
|
||||||
|
}
|
||||||
|
}
|
||||||
func newLayered[T any]() *LayeredCache[T] {
|
func newLayered[T any]() *LayeredCache[T] {
|
||||||
c := Layered[T](Configure[T]())
|
c := Layered[T](Configure[T]())
|
||||||
c.Clear()
|
c.Clear()
|
||||||
|
Reference in New Issue
Block a user