move GC and GetSize to control commands

This commit is contained in:
Karl Seguin
2021-03-20 18:57:11 +08:00
parent 934f76bc44
commit 325d078286
5 changed files with 98 additions and 54 deletions

View File

@@ -14,8 +14,13 @@ type getDropped struct {
res chan int res chan int
} }
type getSize struct {
res chan int64
}
type setMaxSize struct { type setMaxSize struct {
size int64 size int64
done chan struct{}
} }
type clear struct { type clear struct {
@@ -26,6 +31,10 @@ type syncWorker struct {
done chan struct{} done chan struct{}
} }
type gc struct {
done chan struct{}
}
type Cache struct { type Cache struct {
*Configuration *Configuration
list *list.List list *list.List
@@ -167,6 +176,7 @@ func (c *Cache) Delete(key string) bool {
} }
// Clears the cache // Clears the cache
// This is a control command.
func (c *Cache) Clear() { func (c *Cache) Clear() {
done := make(chan struct{}) done := make(chan struct{})
c.control <- clear{done: done} c.control <- clear{done: done}
@@ -175,6 +185,7 @@ func (c *Cache) Clear() {
// Stops the background worker. Operations performed on the cache after Stop // Stops the background worker. Operations performed on the cache after Stop
// is called are likely to panic // is called are likely to panic
// This is a control command.
func (c *Cache) Stop() { func (c *Cache) Stop() {
close(c.promotables) close(c.promotables)
<-c.control <-c.control
@@ -182,6 +193,7 @@ func (c *Cache) Stop() {
// Gets the number of items removed from the cache due to memory pressure since // Gets the number of items removed from the cache due to memory pressure since
// the last time GetDropped was called // the last time GetDropped was called
// This is a control command.
func (c *Cache) GetDropped() int { func (c *Cache) GetDropped() int {
return doGetDropped(c.control) return doGetDropped(c.control)
} }
@@ -205,6 +217,7 @@ func doGetDropped(controlCh chan<- interface{}) int {
// This applies only to cache methods that were previously called by the same goroutine that is // This applies only to cache methods that were previously called by the same goroutine that is
// now calling SyncUpdates. If other goroutines are using the cache at the same time, there is // now calling SyncUpdates. If other goroutines are using the cache at the same time, there is
// no way to know whether any of them still have pending state updates when SyncUpdates returns. // no way to know whether any of them still have pending state updates when SyncUpdates returns.
// This is a control command.
func (c *Cache) SyncUpdates() { func (c *Cache) SyncUpdates() {
doSyncUpdates(c.control) doSyncUpdates(c.control)
} }
@@ -217,8 +230,30 @@ func doSyncUpdates(controlCh chan<- interface{}) {
// Sets a new max size. That can result in a GC being run if the new maxium size // Sets a new max size. That can result in a GC being run if the new maxium size
// is smaller than the cached size // is smaller than the cached size
// This is a control command.
func (c *Cache) SetMaxSize(size int64) { func (c *Cache) SetMaxSize(size int64) {
c.control <- setMaxSize{size} done := make(chan struct{})
c.control <- setMaxSize{size: size, done: done}
<-done
}
// Forces GC. There should be no reason to call this function, except from tests
// which require synchronous GC.
// This is a control command.
func (c *Cache) GC() {
done := make(chan struct{})
c.control <- gc{done: done}
<-done
}
// Gets the size of the cache. This is an O(1) call to make, but it is handled
// by the worker goroutine. It's meant to be called periodically for metrics, or
// from tests.
// This is a control command.
func (c *Cache) GetSize() int64 {
res := make(chan int64)
c.control <- getSize{res}
return <-res
} }
func (c *Cache) restart() { func (c *Cache) restart() {
@@ -283,6 +318,7 @@ func (c *Cache) worker() {
if c.size > c.maxSize { if c.size > c.maxSize {
dropped += c.gc() dropped += c.gc()
} }
msg.done <- struct{}{}
case clear: case clear:
for _, bucket := range c.buckets { for _, bucket := range c.buckets {
bucket.clear() bucket.clear()
@@ -290,6 +326,11 @@ func (c *Cache) worker() {
c.size = 0 c.size = 0
c.list = list.New() c.list = list.New()
msg.done <- struct{}{} msg.done <- struct{}{}
case getSize:
msg.res <- c.size
case gc:
dropped += c.gc()
msg.done <- struct{}{}
case syncWorker: case syncWorker:
doAllPendingPromotesAndDeletes(c.promotables, promoteItem, doAllPendingPromotesAndDeletes(c.promotables, promoteItem,
c.deletables, c.doDelete) c.deletables, c.doDelete)

View File

@@ -124,7 +124,7 @@ func (_ CacheTests) GCsTheOldestItems() {
cache.Set(strconv.Itoa(i), i, time.Minute) cache.Set(strconv.Itoa(i), i, time.Minute)
} }
cache.SyncUpdates() cache.SyncUpdates()
gcCache(cache) cache.GC()
Expect(cache.Get("9")).To.Equal(nil) Expect(cache.Get("9")).To.Equal(nil)
Expect(cache.Get("10").Value()).To.Equal(10) Expect(cache.Get("10").Value()).To.Equal(10)
Expect(cache.ItemCount()).To.Equal(490) Expect(cache.ItemCount()).To.Equal(490)
@@ -138,7 +138,7 @@ func (_ CacheTests) PromotedItemsDontGetPruned() {
cache.SyncUpdates() cache.SyncUpdates()
cache.Get("9") cache.Get("9")
cache.SyncUpdates() cache.SyncUpdates()
gcCache(cache) cache.GC()
Expect(cache.Get("9").Value()).To.Equal(9) Expect(cache.Get("9").Value()).To.Equal(9)
Expect(cache.Get("10")).To.Equal(nil) Expect(cache.Get("10")).To.Equal(nil)
Expect(cache.Get("11").Value()).To.Equal(11) Expect(cache.Get("11").Value()).To.Equal(11)
@@ -152,12 +152,12 @@ func (_ CacheTests) TrackerDoesNotCleanupHeldInstance() {
} }
item1 := cache.TrackingGet("1") item1 := cache.TrackingGet("1")
cache.SyncUpdates() cache.SyncUpdates()
gcCache(cache) cache.GC()
Expect(cache.Get("0").Value()).To.Equal(0) Expect(cache.Get("0").Value()).To.Equal(0)
Expect(cache.Get("1").Value()).To.Equal(1) Expect(cache.Get("1").Value()).To.Equal(1)
item0.Release() item0.Release()
item1.Release() item1.Release()
gcCache(cache) cache.GC()
Expect(cache.Get("0")).To.Equal(nil) Expect(cache.Get("0")).To.Equal(nil)
Expect(cache.Get("1")).To.Equal(nil) Expect(cache.Get("1")).To.Equal(nil)
} }
@@ -202,19 +202,19 @@ func (_ CacheTests) SetUpdatesSizeOnDelta() {
cache.Set("a", &SizedItem{0, 2}, time.Minute) cache.Set("a", &SizedItem{0, 2}, time.Minute)
cache.Set("b", &SizedItem{0, 3}, time.Minute) cache.Set("b", &SizedItem{0, 3}, time.Minute)
cache.SyncUpdates() cache.SyncUpdates()
checkSize(cache, 5) Expect(cache.GetSize()).To.Eql(5)
cache.Set("b", &SizedItem{0, 3}, time.Minute) cache.Set("b", &SizedItem{0, 3}, time.Minute)
cache.SyncUpdates() cache.SyncUpdates()
checkSize(cache, 5) Expect(cache.GetSize()).To.Eql(5)
cache.Set("b", &SizedItem{0, 4}, time.Minute) cache.Set("b", &SizedItem{0, 4}, time.Minute)
cache.SyncUpdates() cache.SyncUpdates()
checkSize(cache, 6) Expect(cache.GetSize()).To.Eql(6)
cache.Set("b", &SizedItem{0, 2}, time.Minute) cache.Set("b", &SizedItem{0, 2}, time.Minute)
cache.SyncUpdates() cache.SyncUpdates()
checkSize(cache, 4) Expect(cache.GetSize()).To.Eql(4)
cache.Delete("b") cache.Delete("b")
cache.SyncUpdates() cache.SyncUpdates()
checkSize(cache, 2) Expect(cache.GetSize()).To.Eql(2)
} }
func (_ CacheTests) ReplaceDoesNotchangeSizeIfNotSet() { func (_ CacheTests) ReplaceDoesNotchangeSizeIfNotSet() {
@@ -224,7 +224,7 @@ func (_ CacheTests) ReplaceDoesNotchangeSizeIfNotSet() {
cache.Set("3", &SizedItem{1, 2}, time.Minute) cache.Set("3", &SizedItem{1, 2}, time.Minute)
cache.Replace("4", &SizedItem{1, 2}) cache.Replace("4", &SizedItem{1, 2})
cache.SyncUpdates() cache.SyncUpdates()
checkSize(cache, 6) Expect(cache.GetSize()).To.Eql(6)
} }
func (_ CacheTests) ReplaceChangesSize() { func (_ CacheTests) ReplaceChangesSize() {
@@ -234,15 +234,15 @@ func (_ CacheTests) ReplaceChangesSize() {
cache.Replace("2", &SizedItem{1, 2}) cache.Replace("2", &SizedItem{1, 2})
cache.SyncUpdates() cache.SyncUpdates()
checkSize(cache, 4) Expect(cache.GetSize()).To.Eql(4)
cache.Replace("2", &SizedItem{1, 1}) cache.Replace("2", &SizedItem{1, 1})
cache.SyncUpdates() cache.SyncUpdates()
checkSize(cache, 3) Expect(cache.GetSize()).To.Eql(3)
cache.Replace("2", &SizedItem{1, 3}) cache.Replace("2", &SizedItem{1, 3})
cache.SyncUpdates() cache.SyncUpdates()
checkSize(cache, 5) Expect(cache.GetSize()).To.Eql(5)
} }
func (_ CacheTests) ResizeOnTheFly() { func (_ CacheTests) ResizeOnTheFly() {
@@ -314,18 +314,6 @@ func (s *SizedItem) Size() int64 {
return s.s return s.s
} }
func checkSize(cache *Cache, sz int64) {
cache.Stop()
Expect(cache.size).To.Equal(sz)
cache.restart()
}
func gcCache(cache *Cache) {
cache.Stop()
cache.gc()
cache.restart()
}
func forEachKeys(cache *Cache) []string { func forEachKeys(cache *Cache) []string {
keys := make([]string, 0, 10) keys := make([]string, 0, 10)
cache.ForEachFunc(func(key string, i *Item) bool { cache.ForEachFunc(func(key string, i *Item) bool {

View File

@@ -198,7 +198,28 @@ func (c *LayeredCache) SyncUpdates() {
// Sets a new max size. That can result in a GC being run if the new maxium size // Sets a new max size. That can result in a GC being run if the new maxium size
// is smaller than the cached size // is smaller than the cached size
func (c *LayeredCache) SetMaxSize(size int64) { func (c *LayeredCache) SetMaxSize(size int64) {
c.control <- setMaxSize{size} done := make(chan struct{})
c.control <- setMaxSize{size: size, done: done}
<-done
}
// Forces GC. There should be no reason to call this function, except from tests
// which require synchronous GC.
// This is a control command.
func (c *LayeredCache) GC() {
done := make(chan struct{})
c.control <- gc{done: done}
<-done
}
// Gets the size of the cache. This is an O(1) call to make, but it is handled
// by the worker goroutine. It's meant to be called periodically for metrics, or
// from tests.
// This is a control command.
func (c *LayeredCache) GetSize() int64 {
res := make(chan int64)
c.control <- getSize{res}
return <-res
} }
func (c *LayeredCache) restart() { func (c *LayeredCache) restart() {
@@ -264,6 +285,7 @@ func (c *LayeredCache) worker() {
if c.size > c.maxSize { if c.size > c.maxSize {
dropped += c.gc() dropped += c.gc()
} }
msg.done <- struct{}{}
case clear: case clear:
for _, bucket := range c.buckets { for _, bucket := range c.buckets {
bucket.clear() bucket.clear()
@@ -271,6 +293,11 @@ func (c *LayeredCache) worker() {
c.size = 0 c.size = 0
c.list = list.New() c.list = list.New()
msg.done <- struct{}{} msg.done <- struct{}{}
case getSize:
msg.res <- c.size
case gc:
dropped += c.gc()
msg.done <- struct{}{}
case syncWorker: case syncWorker:
doAllPendingPromotesAndDeletes(c.promotables, promoteItem, doAllPendingPromotesAndDeletes(c.promotables, promoteItem,
c.deletables, deleteItem) c.deletables, deleteItem)

View File

@@ -172,7 +172,7 @@ func (_ LayeredCacheTests) GCsTheOldestItems() {
cache.Set("xx", "b", 9001, time.Minute) cache.Set("xx", "b", 9001, time.Minute)
//let the items get promoted (and added to our list) //let the items get promoted (and added to our list)
cache.SyncUpdates() cache.SyncUpdates()
gcLayeredCache(cache) cache.GC()
Expect(cache.Get("xx", "a")).To.Equal(nil) Expect(cache.Get("xx", "a")).To.Equal(nil)
Expect(cache.Get("xx", "b").Value()).To.Equal(9001) Expect(cache.Get("xx", "b").Value()).To.Equal(9001)
Expect(cache.Get("8", "a")).To.Equal(nil) Expect(cache.Get("8", "a")).To.Equal(nil)
@@ -188,7 +188,7 @@ func (_ LayeredCacheTests) PromotedItemsDontGetPruned() {
cache.SyncUpdates() cache.SyncUpdates()
cache.Get("9", "a") cache.Get("9", "a")
cache.SyncUpdates() cache.SyncUpdates()
gcLayeredCache(cache) cache.GC()
Expect(cache.Get("9", "a").Value()).To.Equal(9) Expect(cache.Get("9", "a").Value()).To.Equal(9)
Expect(cache.Get("10", "a")).To.Equal(nil) Expect(cache.Get("10", "a")).To.Equal(nil)
Expect(cache.Get("11", "a").Value()).To.Equal(11) Expect(cache.Get("11", "a").Value()).To.Equal(11)
@@ -202,12 +202,12 @@ func (_ LayeredCacheTests) TrackerDoesNotCleanupHeldInstance() {
} }
item1 := cache.TrackingGet("1", "a") item1 := cache.TrackingGet("1", "a")
cache.SyncUpdates() cache.SyncUpdates()
gcLayeredCache(cache) cache.GC()
Expect(cache.Get("0", "a").Value()).To.Equal(0) Expect(cache.Get("0", "a").Value()).To.Equal(0)
Expect(cache.Get("1", "a").Value()).To.Equal(1) Expect(cache.Get("1", "a").Value()).To.Equal(1)
item0.Release() item0.Release()
item1.Release() item1.Release()
gcLayeredCache(cache) cache.GC()
Expect(cache.Get("0", "a")).To.Equal(nil) Expect(cache.Get("0", "a")).To.Equal(nil)
Expect(cache.Get("1", "a")).To.Equal(nil) Expect(cache.Get("1", "a")).To.Equal(nil)
} }
@@ -282,20 +282,20 @@ func (_ LayeredCacheTests) SetUpdatesSizeOnDelta() {
cache.Set("pri", "a", &SizedItem{0, 2}, time.Minute) cache.Set("pri", "a", &SizedItem{0, 2}, time.Minute)
cache.Set("pri", "b", &SizedItem{0, 3}, time.Minute) cache.Set("pri", "b", &SizedItem{0, 3}, time.Minute)
cache.SyncUpdates() cache.SyncUpdates()
checkLayeredSize(cache, 5) Expect(cache.GetSize()).To.Eql(5)
cache.Set("pri", "b", &SizedItem{0, 3}, time.Minute) cache.Set("pri", "b", &SizedItem{0, 3}, time.Minute)
cache.SyncUpdates() cache.SyncUpdates()
checkLayeredSize(cache, 5) Expect(cache.GetSize()).To.Eql(5)
cache.Set("pri", "b", &SizedItem{0, 4}, time.Minute) cache.Set("pri", "b", &SizedItem{0, 4}, time.Minute)
cache.SyncUpdates() cache.SyncUpdates()
checkLayeredSize(cache, 6) Expect(cache.GetSize()).To.Eql(6)
cache.Set("pri", "b", &SizedItem{0, 2}, time.Minute) cache.Set("pri", "b", &SizedItem{0, 2}, time.Minute)
cache.Set("sec", "b", &SizedItem{0, 3}, time.Minute) cache.Set("sec", "b", &SizedItem{0, 3}, time.Minute)
cache.SyncUpdates() cache.SyncUpdates()
checkLayeredSize(cache, 7) Expect(cache.GetSize()).To.Eql(7)
cache.Delete("pri", "b") cache.Delete("pri", "b")
cache.SyncUpdates() cache.SyncUpdates()
checkLayeredSize(cache, 5) Expect(cache.GetSize()).To.Eql(5)
} }
func (_ LayeredCacheTests) ReplaceDoesNotchangeSizeIfNotSet() { func (_ LayeredCacheTests) ReplaceDoesNotchangeSizeIfNotSet() {
@@ -305,7 +305,7 @@ func (_ LayeredCacheTests) ReplaceDoesNotchangeSizeIfNotSet() {
cache.Set("pri", "3", &SizedItem{1, 2}, time.Minute) cache.Set("pri", "3", &SizedItem{1, 2}, time.Minute)
cache.Replace("sec", "3", &SizedItem{1, 2}) cache.Replace("sec", "3", &SizedItem{1, 2})
cache.SyncUpdates() cache.SyncUpdates()
checkLayeredSize(cache, 6) Expect(cache.GetSize()).To.Eql(6)
} }
func (_ LayeredCacheTests) ReplaceChangesSize() { func (_ LayeredCacheTests) ReplaceChangesSize() {
@@ -315,15 +315,15 @@ func (_ LayeredCacheTests) ReplaceChangesSize() {
cache.Replace("pri", "2", &SizedItem{1, 2}) cache.Replace("pri", "2", &SizedItem{1, 2})
cache.SyncUpdates() cache.SyncUpdates()
checkLayeredSize(cache, 4) Expect(cache.GetSize()).To.Eql(4)
cache.Replace("pri", "2", &SizedItem{1, 1}) cache.Replace("pri", "2", &SizedItem{1, 1})
cache.SyncUpdates() cache.SyncUpdates()
checkLayeredSize(cache, 3) Expect(cache.GetSize()).To.Eql(3)
cache.Replace("pri", "2", &SizedItem{1, 3}) cache.Replace("pri", "2", &SizedItem{1, 3})
cache.SyncUpdates() cache.SyncUpdates()
checkLayeredSize(cache, 5) Expect(cache.GetSize()).To.Eql(5)
} }
func (_ LayeredCacheTests) EachFunc() { func (_ LayeredCacheTests) EachFunc() {
@@ -361,18 +361,6 @@ func newLayered() *LayeredCache {
return c return c
} }
func checkLayeredSize(cache *LayeredCache, sz int64) {
cache.Stop()
Expect(cache.size).To.Equal(sz)
cache.restart()
}
func gcLayeredCache(cache *LayeredCache) {
cache.Stop()
cache.gc()
cache.restart()
}
func forEachKeysLayered(cache *LayeredCache, primary string) []string { func forEachKeysLayered(cache *LayeredCache, primary string) []string {
keys := make([]string, 0, 10) keys := make([]string, 0, 10)
cache.ForEachFunc(primary, func(key string, i *Item) bool { cache.ForEachFunc(primary, func(key string, i *Item) bool {

View File

@@ -97,10 +97,10 @@ func (_ SecondaryCacheTests) TrackerDoesNotCleanupHeldInstance() {
sCache := cache.GetOrCreateSecondaryCache("0") sCache := cache.GetOrCreateSecondaryCache("0")
item := sCache.TrackingGet("a") item := sCache.TrackingGet("a")
time.Sleep(time.Millisecond * 10) time.Sleep(time.Millisecond * 10)
gcLayeredCache(cache) cache.GC()
Expect(cache.Get("0", "a").Value()).To.Equal(0) Expect(cache.Get("0", "a").Value()).To.Equal(0)
Expect(cache.Get("1", "a")).To.Equal(nil) Expect(cache.Get("1", "a")).To.Equal(nil)
item.Release() item.Release()
gcLayeredCache(cache) cache.GC()
Expect(cache.Get("0", "a")).To.Equal(nil) Expect(cache.Get("0", "a")).To.Equal(nil)
} }