diff --git a/cache.go b/cache.go index 4b2f474..38f567e 100644 --- a/cache.go +++ b/cache.go @@ -14,8 +14,13 @@ type getDropped struct { res chan int } +type getSize struct { + res chan int64 +} + type setMaxSize struct { size int64 + done chan struct{} } type clear struct { @@ -26,6 +31,10 @@ type syncWorker struct { done chan struct{} } +type gc struct { + done chan struct{} +} + type Cache struct { *Configuration list *list.List @@ -167,6 +176,7 @@ func (c *Cache) Delete(key string) bool { } // Clears the cache +// This is a control command. func (c *Cache) Clear() { done := make(chan struct{}) c.control <- clear{done: done} @@ -175,6 +185,7 @@ func (c *Cache) Clear() { // Stops the background worker. Operations performed on the cache after Stop // is called are likely to panic +// This is a control command. func (c *Cache) Stop() { close(c.promotables) <-c.control @@ -182,6 +193,7 @@ func (c *Cache) Stop() { // Gets the number of items removed from the cache due to memory pressure since // the last time GetDropped was called +// This is a control command. func (c *Cache) GetDropped() int { return doGetDropped(c.control) } @@ -205,6 +217,7 @@ func doGetDropped(controlCh chan<- interface{}) int { // This applies only to cache methods that were previously called by the same goroutine that is // now calling SyncUpdates. If other goroutines are using the cache at the same time, there is // no way to know whether any of them still have pending state updates when SyncUpdates returns. +// This is a control command. func (c *Cache) SyncUpdates() { doSyncUpdates(c.control) } @@ -217,8 +230,30 @@ func doSyncUpdates(controlCh chan<- interface{}) { // Sets a new max size. That can result in a GC being run if the new maxium size // is smaller than the cached size +// This is a control command. func (c *Cache) SetMaxSize(size int64) { - c.control <- setMaxSize{size} + done := make(chan struct{}) + c.control <- setMaxSize{size: size, done: done} + <-done +} + +// Forces GC. There should be no reason to call this function, except from tests +// which require synchronous GC. +// This is a control command. +func (c *Cache) GC() { + done := make(chan struct{}) + c.control <- gc{done: done} + <-done +} + +// Gets the size of the cache. This is an O(1) call to make, but it is handled +// by the worker goroutine. It's meant to be called periodically for metrics, or +// from tests. +// This is a control command. +func (c *Cache) GetSize() int64 { + res := make(chan int64) + c.control <- getSize{res} + return <-res } func (c *Cache) restart() { @@ -283,6 +318,7 @@ func (c *Cache) worker() { if c.size > c.maxSize { dropped += c.gc() } + msg.done <- struct{}{} case clear: for _, bucket := range c.buckets { bucket.clear() @@ -290,6 +326,11 @@ func (c *Cache) worker() { c.size = 0 c.list = list.New() msg.done <- struct{}{} + case getSize: + msg.res <- c.size + case gc: + dropped += c.gc() + msg.done <- struct{}{} case syncWorker: doAllPendingPromotesAndDeletes(c.promotables, promoteItem, c.deletables, c.doDelete) diff --git a/cache_test.go b/cache_test.go index 2b0ee9b..c59153f 100644 --- a/cache_test.go +++ b/cache_test.go @@ -124,7 +124,7 @@ func (_ CacheTests) GCsTheOldestItems() { cache.Set(strconv.Itoa(i), i, time.Minute) } cache.SyncUpdates() - gcCache(cache) + cache.GC() Expect(cache.Get("9")).To.Equal(nil) Expect(cache.Get("10").Value()).To.Equal(10) Expect(cache.ItemCount()).To.Equal(490) @@ -138,7 +138,7 @@ func (_ CacheTests) PromotedItemsDontGetPruned() { cache.SyncUpdates() cache.Get("9") cache.SyncUpdates() - gcCache(cache) + cache.GC() Expect(cache.Get("9").Value()).To.Equal(9) Expect(cache.Get("10")).To.Equal(nil) Expect(cache.Get("11").Value()).To.Equal(11) @@ -152,12 +152,12 @@ func (_ CacheTests) TrackerDoesNotCleanupHeldInstance() { } item1 := cache.TrackingGet("1") cache.SyncUpdates() - gcCache(cache) + cache.GC() Expect(cache.Get("0").Value()).To.Equal(0) Expect(cache.Get("1").Value()).To.Equal(1) item0.Release() item1.Release() - gcCache(cache) + cache.GC() Expect(cache.Get("0")).To.Equal(nil) Expect(cache.Get("1")).To.Equal(nil) } @@ -202,19 +202,19 @@ func (_ CacheTests) SetUpdatesSizeOnDelta() { cache.Set("a", &SizedItem{0, 2}, time.Minute) cache.Set("b", &SizedItem{0, 3}, time.Minute) cache.SyncUpdates() - checkSize(cache, 5) + Expect(cache.GetSize()).To.Eql(5) cache.Set("b", &SizedItem{0, 3}, time.Minute) cache.SyncUpdates() - checkSize(cache, 5) + Expect(cache.GetSize()).To.Eql(5) cache.Set("b", &SizedItem{0, 4}, time.Minute) cache.SyncUpdates() - checkSize(cache, 6) + Expect(cache.GetSize()).To.Eql(6) cache.Set("b", &SizedItem{0, 2}, time.Minute) cache.SyncUpdates() - checkSize(cache, 4) + Expect(cache.GetSize()).To.Eql(4) cache.Delete("b") cache.SyncUpdates() - checkSize(cache, 2) + Expect(cache.GetSize()).To.Eql(2) } func (_ CacheTests) ReplaceDoesNotchangeSizeIfNotSet() { @@ -224,7 +224,7 @@ func (_ CacheTests) ReplaceDoesNotchangeSizeIfNotSet() { cache.Set("3", &SizedItem{1, 2}, time.Minute) cache.Replace("4", &SizedItem{1, 2}) cache.SyncUpdates() - checkSize(cache, 6) + Expect(cache.GetSize()).To.Eql(6) } func (_ CacheTests) ReplaceChangesSize() { @@ -234,15 +234,15 @@ func (_ CacheTests) ReplaceChangesSize() { cache.Replace("2", &SizedItem{1, 2}) cache.SyncUpdates() - checkSize(cache, 4) + Expect(cache.GetSize()).To.Eql(4) cache.Replace("2", &SizedItem{1, 1}) cache.SyncUpdates() - checkSize(cache, 3) + Expect(cache.GetSize()).To.Eql(3) cache.Replace("2", &SizedItem{1, 3}) cache.SyncUpdates() - checkSize(cache, 5) + Expect(cache.GetSize()).To.Eql(5) } func (_ CacheTests) ResizeOnTheFly() { @@ -314,18 +314,6 @@ func (s *SizedItem) Size() int64 { return s.s } -func checkSize(cache *Cache, sz int64) { - cache.Stop() - Expect(cache.size).To.Equal(sz) - cache.restart() -} - -func gcCache(cache *Cache) { - cache.Stop() - cache.gc() - cache.restart() -} - func forEachKeys(cache *Cache) []string { keys := make([]string, 0, 10) cache.ForEachFunc(func(key string, i *Item) bool { diff --git a/layeredcache.go b/layeredcache.go index 8573753..50a3682 100644 --- a/layeredcache.go +++ b/layeredcache.go @@ -198,7 +198,28 @@ func (c *LayeredCache) SyncUpdates() { // Sets a new max size. That can result in a GC being run if the new maxium size // is smaller than the cached size func (c *LayeredCache) SetMaxSize(size int64) { - c.control <- setMaxSize{size} + done := make(chan struct{}) + c.control <- setMaxSize{size: size, done: done} + <-done +} + +// Forces GC. There should be no reason to call this function, except from tests +// which require synchronous GC. +// This is a control command. +func (c *LayeredCache) GC() { + done := make(chan struct{}) + c.control <- gc{done: done} + <-done +} + +// Gets the size of the cache. This is an O(1) call to make, but it is handled +// by the worker goroutine. It's meant to be called periodically for metrics, or +// from tests. +// This is a control command. +func (c *LayeredCache) GetSize() int64 { + res := make(chan int64) + c.control <- getSize{res} + return <-res } func (c *LayeredCache) restart() { @@ -264,6 +285,7 @@ func (c *LayeredCache) worker() { if c.size > c.maxSize { dropped += c.gc() } + msg.done <- struct{}{} case clear: for _, bucket := range c.buckets { bucket.clear() @@ -271,6 +293,11 @@ func (c *LayeredCache) worker() { c.size = 0 c.list = list.New() msg.done <- struct{}{} + case getSize: + msg.res <- c.size + case gc: + dropped += c.gc() + msg.done <- struct{}{} case syncWorker: doAllPendingPromotesAndDeletes(c.promotables, promoteItem, c.deletables, deleteItem) diff --git a/layeredcache_test.go b/layeredcache_test.go index feb41ba..2fa12fa 100644 --- a/layeredcache_test.go +++ b/layeredcache_test.go @@ -172,7 +172,7 @@ func (_ LayeredCacheTests) GCsTheOldestItems() { cache.Set("xx", "b", 9001, time.Minute) //let the items get promoted (and added to our list) cache.SyncUpdates() - gcLayeredCache(cache) + cache.GC() Expect(cache.Get("xx", "a")).To.Equal(nil) Expect(cache.Get("xx", "b").Value()).To.Equal(9001) Expect(cache.Get("8", "a")).To.Equal(nil) @@ -188,7 +188,7 @@ func (_ LayeredCacheTests) PromotedItemsDontGetPruned() { cache.SyncUpdates() cache.Get("9", "a") cache.SyncUpdates() - gcLayeredCache(cache) + cache.GC() Expect(cache.Get("9", "a").Value()).To.Equal(9) Expect(cache.Get("10", "a")).To.Equal(nil) Expect(cache.Get("11", "a").Value()).To.Equal(11) @@ -202,12 +202,12 @@ func (_ LayeredCacheTests) TrackerDoesNotCleanupHeldInstance() { } item1 := cache.TrackingGet("1", "a") cache.SyncUpdates() - gcLayeredCache(cache) + cache.GC() Expect(cache.Get("0", "a").Value()).To.Equal(0) Expect(cache.Get("1", "a").Value()).To.Equal(1) item0.Release() item1.Release() - gcLayeredCache(cache) + cache.GC() Expect(cache.Get("0", "a")).To.Equal(nil) Expect(cache.Get("1", "a")).To.Equal(nil) } @@ -282,20 +282,20 @@ func (_ LayeredCacheTests) SetUpdatesSizeOnDelta() { cache.Set("pri", "a", &SizedItem{0, 2}, time.Minute) cache.Set("pri", "b", &SizedItem{0, 3}, time.Minute) cache.SyncUpdates() - checkLayeredSize(cache, 5) + Expect(cache.GetSize()).To.Eql(5) cache.Set("pri", "b", &SizedItem{0, 3}, time.Minute) cache.SyncUpdates() - checkLayeredSize(cache, 5) + Expect(cache.GetSize()).To.Eql(5) cache.Set("pri", "b", &SizedItem{0, 4}, time.Minute) cache.SyncUpdates() - checkLayeredSize(cache, 6) + Expect(cache.GetSize()).To.Eql(6) cache.Set("pri", "b", &SizedItem{0, 2}, time.Minute) cache.Set("sec", "b", &SizedItem{0, 3}, time.Minute) cache.SyncUpdates() - checkLayeredSize(cache, 7) + Expect(cache.GetSize()).To.Eql(7) cache.Delete("pri", "b") cache.SyncUpdates() - checkLayeredSize(cache, 5) + Expect(cache.GetSize()).To.Eql(5) } func (_ LayeredCacheTests) ReplaceDoesNotchangeSizeIfNotSet() { @@ -305,7 +305,7 @@ func (_ LayeredCacheTests) ReplaceDoesNotchangeSizeIfNotSet() { cache.Set("pri", "3", &SizedItem{1, 2}, time.Minute) cache.Replace("sec", "3", &SizedItem{1, 2}) cache.SyncUpdates() - checkLayeredSize(cache, 6) + Expect(cache.GetSize()).To.Eql(6) } func (_ LayeredCacheTests) ReplaceChangesSize() { @@ -315,15 +315,15 @@ func (_ LayeredCacheTests) ReplaceChangesSize() { cache.Replace("pri", "2", &SizedItem{1, 2}) cache.SyncUpdates() - checkLayeredSize(cache, 4) + Expect(cache.GetSize()).To.Eql(4) cache.Replace("pri", "2", &SizedItem{1, 1}) cache.SyncUpdates() - checkLayeredSize(cache, 3) + Expect(cache.GetSize()).To.Eql(3) cache.Replace("pri", "2", &SizedItem{1, 3}) cache.SyncUpdates() - checkLayeredSize(cache, 5) + Expect(cache.GetSize()).To.Eql(5) } func (_ LayeredCacheTests) EachFunc() { @@ -361,18 +361,6 @@ func newLayered() *LayeredCache { return c } -func checkLayeredSize(cache *LayeredCache, sz int64) { - cache.Stop() - Expect(cache.size).To.Equal(sz) - cache.restart() -} - -func gcLayeredCache(cache *LayeredCache) { - cache.Stop() - cache.gc() - cache.restart() -} - func forEachKeysLayered(cache *LayeredCache, primary string) []string { keys := make([]string, 0, 10) cache.ForEachFunc(primary, func(key string, i *Item) bool { diff --git a/secondarycache_test.go b/secondarycache_test.go index 09adbc5..e729415 100644 --- a/secondarycache_test.go +++ b/secondarycache_test.go @@ -97,10 +97,10 @@ func (_ SecondaryCacheTests) TrackerDoesNotCleanupHeldInstance() { sCache := cache.GetOrCreateSecondaryCache("0") item := sCache.TrackingGet("a") time.Sleep(time.Millisecond * 10) - gcLayeredCache(cache) + cache.GC() Expect(cache.Get("0", "a").Value()).To.Equal(0) Expect(cache.Get("1", "a")).To.Equal(nil) item.Release() - gcLayeredCache(cache) + cache.GC() Expect(cache.Get("0", "a")).To.Equal(nil) }