diff --git a/cache.go b/cache.go index 0f87c92..b3e6969 100644 --- a/cache.go +++ b/cache.go @@ -13,6 +13,7 @@ import ( type getDropped struct { res chan int } + type setMaxSize struct { size int64 } @@ -21,6 +22,10 @@ type clear struct { done chan struct{} } +type syncWorker struct { + done chan struct{} +} + type Cache struct { *Configuration list *list.List @@ -175,11 +180,38 @@ func (c *Cache) Stop() { // Gets the number of items removed from the cache due to memory pressure since // the last time GetDropped was called func (c *Cache) GetDropped() int { + return doGetDropped(c.control) +} + +func doGetDropped(controlCh chan<- interface{}) int { res := make(chan int) - c.control <- getDropped{res: res} + controlCh <- getDropped{res: res} return <-res } +// SyncUpdates waits until the cache has finished asynchronous state updates for any operations +// that were done by the current goroutine up to now. +// +// For efficiency, the cache's implementation of LRU behavior is partly managed by a worker +// goroutine that updates its internal data structures asynchronously. This means that the +// cache's state in terms of (for instance) eviction of LRU items is only eventually consistent; +// there is no guarantee that it happens before a Get or Set call has returned. Most of the time +// application code will not care about this, but especially in a test scenario you may want to +// be able to know when the worker has caught up. +// +// This applies only to cache methods that were previously called by the same goroutine that is +// now calling SyncUpdates. If other goroutines are using the cache at the same time, there is +// no way to know whether any of them still have pending state updates when SyncUpdates returns. +func (c *Cache) SyncUpdates() { + doSyncUpdates(c.control) +} + +func doSyncUpdates(controlCh chan<- interface{}) { + done := make(chan struct{}) + controlCh <- syncWorker{done: done} + <-done +} + // Sets a new max size. That can result in a GC being run if the new maxium size // is smaller than the cached size func (c *Cache) SetMaxSize(size int64) { @@ -224,15 +256,18 @@ func (c *Cache) promote(item *Item) { func (c *Cache) worker() { defer close(c.control) dropped := 0 + promoteItem := func(item *Item) { + if c.doPromote(item) && c.size > c.maxSize { + dropped += c.gc() + } + } for { select { case item, ok := <-c.promotables: if ok == false { goto drain } - if c.doPromote(item) && c.size > c.maxSize { - dropped += c.gc() - } + promoteItem(item) case item := <-c.deletables: c.doDelete(item) case control := <-c.control: @@ -252,6 +287,10 @@ func (c *Cache) worker() { c.size = 0 c.list = list.New() msg.done <- struct{}{} + case syncWorker: + doAllPendingPromotesAndDeletes(c.promotables, promoteItem, + c.deletables, c.doDelete) + msg.done <- struct{}{} } } } @@ -268,6 +307,39 @@ drain: } } +// This method is used to implement SyncUpdates. It simply receives and processes as many +// items as it can receive from the promotables and deletables channels immediately without +// blocking. If some other goroutine sends an item on either channel after this method has +// finished receiving, that's OK, because SyncUpdates only guarantees processing of values +// that were already sent by the same goroutine. +func doAllPendingPromotesAndDeletes( + promotables <-chan *Item, + promoteFn func(*Item), + deletables <-chan *Item, + deleteFn func(*Item), +) { +doAllPromotes: + for { + select { + case item := <-promotables: + if item != nil { + promoteFn(item) + } + default: + break doAllPromotes + } + } +doAllDeletes: + for { + select { + case item := <-deletables: + deleteFn(item) + default: + break doAllDeletes + } + } +} + func (c *Cache) doDelete(item *Item) { if item.element == nil { item.promotions = -2 diff --git a/cache_test.go b/cache_test.go index 921bd4d..a85e914 100644 --- a/cache_test.go +++ b/cache_test.go @@ -18,6 +18,7 @@ func Test_Cache(t *testing.T) { func (_ CacheTests) DeletesAValue() { cache := New(Configure()) + defer cache.Stop() Expect(cache.ItemCount()).To.Equal(0) cache.Set("spice", "flow", time.Minute) @@ -32,6 +33,7 @@ func (_ CacheTests) DeletesAValue() { func (_ CacheTests) DeletesAPrefix() { cache := New(Configure()) + defer cache.Stop() Expect(cache.ItemCount()).To.Equal(0) cache.Set("aaa", "1", time.Minute) @@ -55,6 +57,7 @@ func (_ CacheTests) DeletesAPrefix() { func (_ CacheTests) DeletesAFunc() { cache := New(Configure()) + defer cache.Stop() Expect(cache.ItemCount()).To.Equal(0) cache.Set("a", 1, time.Minute) @@ -91,12 +94,14 @@ func (_ CacheTests) OnDeleteCallbackCalled() { } cache := New(Configure().OnDelete(onDeleteFn)) + defer cache.Stop() cache.Set("spice", "flow", time.Minute) cache.Set("worm", "sand", time.Minute) - time.Sleep(time.Millisecond * 10) // Run once to init + cache.SyncUpdates() // wait for worker to pick up preceding updates + cache.Delete("spice") - time.Sleep(time.Millisecond * 10) // Wait for worker to pick up deleted items + cache.SyncUpdates() Expect(cache.Get("spice")).To.Equal(nil) Expect(cache.Get("worm").Value()).To.Equal("sand") @@ -105,6 +110,7 @@ func (_ CacheTests) OnDeleteCallbackCalled() { func (_ CacheTests) FetchesExpiredItems() { cache := New(Configure()) + defer cache.Stop() fn := func() (interface{}, error) { return "moo-moo", nil } cache.Set("beef", "moo", time.Second*-1) @@ -116,11 +122,11 @@ func (_ CacheTests) FetchesExpiredItems() { func (_ CacheTests) GCsTheOldestItems() { cache := New(Configure().ItemsToPrune(10)) + defer cache.Stop() for i := 0; i < 500; i++ { cache.Set(strconv.Itoa(i), i, time.Minute) } - //let the items get promoted (and added to our list) - time.Sleep(time.Millisecond * 10) + cache.SyncUpdates() gcCache(cache) Expect(cache.Get("9")).To.Equal(nil) Expect(cache.Get("10").Value()).To.Equal(10) @@ -129,12 +135,13 @@ func (_ CacheTests) GCsTheOldestItems() { func (_ CacheTests) PromotedItemsDontGetPruned() { cache := New(Configure().ItemsToPrune(10).GetsPerPromote(1)) + defer cache.Stop() for i := 0; i < 500; i++ { cache.Set(strconv.Itoa(i), i, time.Minute) } - time.Sleep(time.Millisecond * 10) //run the worker once to init the list + cache.SyncUpdates() cache.Get("9") - time.Sleep(time.Millisecond * 10) + cache.SyncUpdates() gcCache(cache) Expect(cache.Get("9").Value()).To.Equal(9) Expect(cache.Get("10")).To.Equal(nil) @@ -148,7 +155,7 @@ func (_ CacheTests) TrackerDoesNotCleanupHeldInstance() { cache.Set(strconv.Itoa(i), i, time.Minute) } item1 := cache.TrackingGet("1") - time.Sleep(time.Millisecond * 10) + cache.SyncUpdates() gcCache(cache) Expect(cache.Get("0").Value()).To.Equal(0) Expect(cache.Get("1").Value()).To.Equal(1) @@ -171,7 +178,7 @@ func (_ CacheTests) RemovesOldestItemWhenFull() { for i := 0; i < 7; i++ { cache.Set(strconv.Itoa(i), i, time.Minute) } - time.Sleep(time.Millisecond * 10) + cache.SyncUpdates() Expect(cache.Get("0")).To.Equal(nil) Expect(cache.Get("1")).To.Equal(nil) Expect(cache.Get("2").Value()).To.Equal(2) @@ -184,7 +191,7 @@ func (_ CacheTests) RemovesOldestItemWhenFullBySizer() { for i := 0; i < 7; i++ { cache.Set(strconv.Itoa(i), &SizedItem{i, 2}, time.Minute) } - time.Sleep(time.Millisecond * 10) + cache.SyncUpdates() Expect(cache.Get("0")).To.Equal(nil) Expect(cache.Get("1")).To.Equal(nil) Expect(cache.Get("2")).To.Equal(nil) @@ -198,19 +205,19 @@ func (_ CacheTests) SetUpdatesSizeOnDelta() { cache := New(Configure()) cache.Set("a", &SizedItem{0, 2}, time.Minute) cache.Set("b", &SizedItem{0, 3}, time.Minute) - time.Sleep(time.Millisecond * 5) + cache.SyncUpdates() checkSize(cache, 5) cache.Set("b", &SizedItem{0, 3}, time.Minute) - time.Sleep(time.Millisecond * 5) + cache.SyncUpdates() checkSize(cache, 5) cache.Set("b", &SizedItem{0, 4}, time.Minute) - time.Sleep(time.Millisecond * 5) + cache.SyncUpdates() checkSize(cache, 6) cache.Set("b", &SizedItem{0, 2}, time.Minute) - time.Sleep(time.Millisecond * 5) + cache.SyncUpdates() checkSize(cache, 4) cache.Delete("b") - time.Sleep(time.Millisecond * 100) + cache.SyncUpdates() checkSize(cache, 2) } @@ -220,7 +227,7 @@ func (_ CacheTests) ReplaceDoesNotchangeSizeIfNotSet() { cache.Set("2", &SizedItem{1, 2}, time.Minute) cache.Set("3", &SizedItem{1, 2}, time.Minute) cache.Replace("4", &SizedItem{1, 2}) - time.Sleep(time.Millisecond * 5) + cache.SyncUpdates() checkSize(cache, 6) } @@ -230,15 +237,15 @@ func (_ CacheTests) ReplaceChangesSize() { cache.Set("2", &SizedItem{1, 2}, time.Minute) cache.Replace("2", &SizedItem{1, 2}) - time.Sleep(time.Millisecond * 5) + cache.SyncUpdates() checkSize(cache, 4) cache.Replace("2", &SizedItem{1, 1}) - time.Sleep(time.Millisecond * 5) + cache.SyncUpdates() checkSize(cache, 3) cache.Replace("2", &SizedItem{1, 3}) - time.Sleep(time.Millisecond * 5) + cache.SyncUpdates() checkSize(cache, 5) } @@ -248,7 +255,7 @@ func (_ CacheTests) ResizeOnTheFly() { cache.Set(strconv.Itoa(i), i, time.Minute) } cache.SetMaxSize(3) - time.Sleep(time.Millisecond * 10) + cache.SyncUpdates() Expect(cache.GetDropped()).To.Equal(2) Expect(cache.Get("0")).To.Equal(nil) Expect(cache.Get("1")).To.Equal(nil) @@ -257,7 +264,7 @@ func (_ CacheTests) ResizeOnTheFly() { Expect(cache.Get("4").Value()).To.Equal(4) cache.Set("5", 5, time.Minute) - time.Sleep(time.Millisecond * 5) + cache.SyncUpdates() Expect(cache.GetDropped()).To.Equal(1) Expect(cache.Get("2")).To.Equal(nil) Expect(cache.Get("3").Value()).To.Equal(3) @@ -266,7 +273,7 @@ func (_ CacheTests) ResizeOnTheFly() { cache.SetMaxSize(10) cache.Set("6", 6, time.Minute) - time.Sleep(time.Millisecond * 10) + cache.SyncUpdates() Expect(cache.GetDropped()).To.Equal(0) Expect(cache.Get("3").Value()).To.Equal(3) Expect(cache.Get("4").Value()).To.Equal(4) @@ -282,23 +289,23 @@ func (_ CacheTests) ForEachFunc() { Expect(forEachKeys(cache)).To.Equal([]string{"1"}) cache.Set("2", 2, time.Minute) - time.Sleep(time.Millisecond * 10) + cache.SyncUpdates() Expect(forEachKeys(cache)).To.Equal([]string{"1", "2"}) cache.Set("3", 3, time.Minute) - time.Sleep(time.Millisecond * 10) + cache.SyncUpdates() Expect(forEachKeys(cache)).To.Equal([]string{"1", "2", "3"}) cache.Set("4", 4, time.Minute) - time.Sleep(time.Millisecond * 10) + cache.SyncUpdates() Expect(forEachKeys(cache)).To.Equal([]string{"2", "3", "4"}) cache.Set("stop", 5, time.Minute) - time.Sleep(time.Millisecond * 10) + cache.SyncUpdates() Expect(forEachKeys(cache)).Not.To.Contain("stop") cache.Set("6", 6, time.Minute) - time.Sleep(time.Millisecond * 10) + cache.SyncUpdates() Expect(forEachKeys(cache)).Not.To.Contain("stop") } diff --git a/item.go b/item.go index 0226129..18d8203 100644 --- a/item.go +++ b/item.go @@ -2,6 +2,7 @@ package ccache import ( "container/list" + "fmt" "sync/atomic" "time" ) @@ -105,3 +106,13 @@ func (i *Item) Expires() time.Time { func (i *Item) Extend(duration time.Duration) { atomic.StoreInt64(&i.expires, time.Now().Add(duration).UnixNano()) } + +// String returns a string representation of the Item. This includes the default string +// representation of its Value(), as implemented by fmt.Sprintf with "%v", but the exact +// format of the string should not be relied on; it is provided only for debugging +// purposes, and because otherwise including an Item in a call to fmt.Printf or +// fmt.Sprintf expression could cause fields of the Item to be read in a non-thread-safe +// way. +func (i *Item) String() string { + return fmt.Sprintf("Item(%v)", i.value) +} diff --git a/layeredcache.go b/layeredcache.go index 3ffaf0d..a516c7a 100644 --- a/layeredcache.go +++ b/layeredcache.go @@ -183,9 +183,13 @@ func (c *LayeredCache) Stop() { // Gets the number of items removed from the cache due to memory pressure since // the last time GetDropped was called func (c *LayeredCache) GetDropped() int { - res := make(chan int) - c.control <- getDropped{res: res} - return <-res + return doGetDropped(c.control) +} + +// SyncUpdates waits until the cache has finished asynchronous state updates for any operations +// that were done by the current goroutine up to now. See Cache.SyncUpdates for details. +func (c *LayeredCache) SyncUpdates() { + doSyncUpdates(c.control) } // Sets a new max size. That can result in a GC being run if the new maxium size @@ -222,25 +226,31 @@ func (c *LayeredCache) promote(item *Item) { func (c *LayeredCache) worker() { defer close(c.control) dropped := 0 + promoteItem := func(item *Item) { + if c.doPromote(item) && c.size > c.maxSize { + dropped += c.gc() + } + } + deleteItem := func(item *Item) { + if item.element == nil { + atomic.StoreInt32(&item.promotions, -2) + } else { + c.size -= item.size + if c.onDelete != nil { + c.onDelete(item) + } + c.list.Remove(item.element) + } + } for { select { case item, ok := <-c.promotables: if ok == false { return } - if c.doPromote(item) && c.size > c.maxSize { - dropped += c.gc() - } + promoteItem(item) case item := <-c.deletables: - if item.element == nil { - atomic.StoreInt32(&item.promotions, -2) - } else { - c.size -= item.size - if c.onDelete != nil { - c.onDelete(item) - } - c.list.Remove(item.element) - } + deleteItem(item) case control := <-c.control: switch msg := control.(type) { case getDropped: @@ -258,6 +268,10 @@ func (c *LayeredCache) worker() { c.size = 0 c.list = list.New() msg.done <- struct{}{} + case syncWorker: + doAllPendingPromotesAndDeletes(c.promotables, promoteItem, + c.deletables, deleteItem) + msg.done <- struct{}{} } } } diff --git a/layeredcache_test.go b/layeredcache_test.go index 390b4e4..a7492d5 100644 --- a/layeredcache_test.go +++ b/layeredcache_test.go @@ -139,9 +139,9 @@ func (_ *LayeredCacheTests) OnDeleteCallbackCalled() { cache.Set("spice", "must", "value-b", time.Minute) cache.Set("leto", "sister", "ghanima", time.Minute) - time.Sleep(time.Millisecond * 10) // Run once to init + cache.SyncUpdates() cache.Delete("spice", "flow") - time.Sleep(time.Millisecond * 10) // Wait for worker to pick up deleted items + cache.SyncUpdates() Expect(cache.Get("spice", "flow")).To.Equal(nil) Expect(cache.Get("spice", "must").Value()).To.Equal("value-b") @@ -171,7 +171,7 @@ func (_ LayeredCacheTests) GCsTheOldestItems() { } cache.Set("xx", "b", 9001, time.Minute) //let the items get promoted (and added to our list) - time.Sleep(time.Millisecond * 10) + cache.SyncUpdates() gcLayeredCache(cache) Expect(cache.Get("xx", "a")).To.Equal(nil) Expect(cache.Get("xx", "b").Value()).To.Equal(9001) @@ -185,9 +185,9 @@ func (_ LayeredCacheTests) PromotedItemsDontGetPruned() { for i := 0; i < 500; i++ { cache.Set(strconv.Itoa(i), "a", i, time.Minute) } - time.Sleep(time.Millisecond * 10) //run the worker once to init the list + cache.SyncUpdates() cache.Get("9", "a") - time.Sleep(time.Millisecond * 10) + cache.SyncUpdates() gcLayeredCache(cache) Expect(cache.Get("9", "a").Value()).To.Equal(9) Expect(cache.Get("10", "a")).To.Equal(nil) @@ -201,7 +201,7 @@ func (_ LayeredCacheTests) TrackerDoesNotCleanupHeldInstance() { cache.Set(strconv.Itoa(i), "a", i, time.Minute) } item1 := cache.TrackingGet("1", "a") - time.Sleep(time.Millisecond * 10) + cache.SyncUpdates() gcLayeredCache(cache) Expect(cache.Get("0", "a").Value()).To.Equal(0) Expect(cache.Get("1", "a").Value()).To.Equal(1) @@ -219,7 +219,7 @@ func (_ LayeredCacheTests) RemovesOldestItemWhenFull() { cache.Set(strconv.Itoa(i), "a", i, time.Minute) } cache.Set("xx", "b", 9001, time.Minute) - time.Sleep(time.Millisecond * 10) + cache.SyncUpdates() Expect(cache.Get("xx", "a")).To.Equal(nil) Expect(cache.Get("0", "a")).To.Equal(nil) Expect(cache.Get("1", "a")).To.Equal(nil) @@ -236,7 +236,7 @@ func (_ LayeredCacheTests) ResizeOnTheFly() { cache.Set(strconv.Itoa(i), "a", i, time.Minute) } cache.SetMaxSize(3) - time.Sleep(time.Millisecond * 10) + cache.SyncUpdates() Expect(cache.GetDropped()).To.Equal(2) Expect(cache.Get("0", "a")).To.Equal(nil) Expect(cache.Get("1", "a")).To.Equal(nil) @@ -245,7 +245,7 @@ func (_ LayeredCacheTests) ResizeOnTheFly() { Expect(cache.Get("4", "a").Value()).To.Equal(4) cache.Set("5", "a", 5, time.Minute) - time.Sleep(time.Millisecond * 5) + cache.SyncUpdates() Expect(cache.GetDropped()).To.Equal(1) Expect(cache.Get("2", "a")).To.Equal(nil) Expect(cache.Get("3", "a").Value()).To.Equal(3) @@ -254,7 +254,7 @@ func (_ LayeredCacheTests) ResizeOnTheFly() { cache.SetMaxSize(10) cache.Set("6", "a", 6, time.Minute) - time.Sleep(time.Millisecond * 10) + cache.SyncUpdates() Expect(cache.GetDropped()).To.Equal(0) Expect(cache.Get("3", "a").Value()).To.Equal(3) Expect(cache.Get("4", "a").Value()).To.Equal(4) @@ -267,7 +267,7 @@ func (_ LayeredCacheTests) RemovesOldestItemWhenFullBySizer() { for i := 0; i < 7; i++ { cache.Set("pri", strconv.Itoa(i), &SizedItem{i, 2}, time.Minute) } - time.Sleep(time.Millisecond * 10) + cache.SyncUpdates() Expect(cache.Get("pri", "0")).To.Equal(nil) Expect(cache.Get("pri", "1")).To.Equal(nil) Expect(cache.Get("pri", "2")).To.Equal(nil) @@ -279,20 +279,20 @@ func (_ LayeredCacheTests) SetUpdatesSizeOnDelta() { cache := Layered(Configure()) cache.Set("pri", "a", &SizedItem{0, 2}, time.Minute) cache.Set("pri", "b", &SizedItem{0, 3}, time.Minute) - time.Sleep(time.Millisecond * 5) + cache.SyncUpdates() checkLayeredSize(cache, 5) cache.Set("pri", "b", &SizedItem{0, 3}, time.Minute) - time.Sleep(time.Millisecond * 5) + cache.SyncUpdates() checkLayeredSize(cache, 5) cache.Set("pri", "b", &SizedItem{0, 4}, time.Minute) - time.Sleep(time.Millisecond * 5) + cache.SyncUpdates() checkLayeredSize(cache, 6) cache.Set("pri", "b", &SizedItem{0, 2}, time.Minute) cache.Set("sec", "b", &SizedItem{0, 3}, time.Minute) - time.Sleep(time.Millisecond * 5) + cache.SyncUpdates() checkLayeredSize(cache, 7) cache.Delete("pri", "b") - time.Sleep(time.Millisecond * 10) + cache.SyncUpdates() checkLayeredSize(cache, 5) } @@ -302,7 +302,7 @@ func (_ LayeredCacheTests) ReplaceDoesNotchangeSizeIfNotSet() { cache.Set("pri", "2", &SizedItem{1, 2}, time.Minute) cache.Set("pri", "3", &SizedItem{1, 2}, time.Minute) cache.Replace("sec", "3", &SizedItem{1, 2}) - time.Sleep(time.Millisecond * 5) + cache.SyncUpdates() checkLayeredSize(cache, 6) } @@ -312,15 +312,15 @@ func (_ LayeredCacheTests) ReplaceChangesSize() { cache.Set("pri", "2", &SizedItem{1, 2}, time.Minute) cache.Replace("pri", "2", &SizedItem{1, 2}) - time.Sleep(time.Millisecond * 5) + cache.SyncUpdates() checkLayeredSize(cache, 4) cache.Replace("pri", "2", &SizedItem{1, 1}) - time.Sleep(time.Millisecond * 5) + cache.SyncUpdates() checkLayeredSize(cache, 3) cache.Replace("pri", "2", &SizedItem{1, 3}) - time.Sleep(time.Millisecond * 5) + cache.SyncUpdates() checkLayeredSize(cache, 5) } @@ -332,24 +332,24 @@ func (_ LayeredCacheTests) EachFunc() { Expect(forEachKeysLayered(cache, "1")).To.Equal([]string{"a"}) cache.Set("1", "b", 2, time.Minute) - time.Sleep(time.Millisecond * 10) + cache.SyncUpdates() Expect(forEachKeysLayered(cache, "1")).To.Equal([]string{"a", "b"}) cache.Set("1", "c", 3, time.Minute) - time.Sleep(time.Millisecond * 10) + cache.SyncUpdates() Expect(forEachKeysLayered(cache, "1")).To.Equal([]string{"a", "b", "c"}) cache.Set("1", "d", 4, time.Minute) - time.Sleep(time.Millisecond * 10) + cache.SyncUpdates() Expect(forEachKeysLayered(cache, "1")).To.Equal([]string{"b", "c", "d"}) // iteration is non-deterministic, all we know for sure is "stop" should not be in there cache.Set("1", "stop", 5, time.Minute) - time.Sleep(time.Millisecond * 10) + cache.SyncUpdates() Expect(forEachKeysLayered(cache, "1")).Not.To.Contain("stop") cache.Set("1", "e", 6, time.Minute) - time.Sleep(time.Millisecond * 10) + cache.SyncUpdates() Expect(forEachKeysLayered(cache, "1")).Not.To.Contain("stop") }