add SyncUpdates method to synchronize with worker thread, and use it in tests

This commit is contained in:
Eli Bishop
2021-03-18 16:09:30 -07:00
parent ae1872d700
commit c1fb5be323
5 changed files with 174 additions and 70 deletions

View File

@@ -13,6 +13,7 @@ import (
type getDropped struct {
res chan int
}
type setMaxSize struct {
size int64
}
@@ -21,6 +22,10 @@ type clear struct {
done chan struct{}
}
type syncWorker struct {
done chan struct{}
}
type Cache struct {
*Configuration
list *list.List
@@ -175,11 +180,38 @@ func (c *Cache) Stop() {
// Gets the number of items removed from the cache due to memory pressure since
// the last time GetDropped was called
func (c *Cache) GetDropped() int {
return doGetDropped(c.control)
}
func doGetDropped(controlCh chan<- interface{}) int {
res := make(chan int)
c.control <- getDropped{res: res}
controlCh <- getDropped{res: res}
return <-res
}
// SyncUpdates waits until the cache has finished asynchronous state updates for any operations
// that were done by the current goroutine up to now.
//
// For efficiency, the cache's implementation of LRU behavior is partly managed by a worker
// goroutine that updates its internal data structures asynchronously. This means that the
// cache's state in terms of (for instance) eviction of LRU items is only eventually consistent;
// there is no guarantee that it happens before a Get or Set call has returned. Most of the time
// application code will not care about this, but especially in a test scenario you may want to
// be able to know when the worker has caught up.
//
// This applies only to cache methods that were previously called by the same goroutine that is
// now calling SyncUpdates. If other goroutines are using the cache at the same time, there is
// no way to know whether any of them still have pending state updates when SyncUpdates returns.
func (c *Cache) SyncUpdates() {
doSyncUpdates(c.control)
}
func doSyncUpdates(controlCh chan<- interface{}) {
done := make(chan struct{})
controlCh <- syncWorker{done: done}
<-done
}
// Sets a new max size. That can result in a GC being run if the new maxium size
// is smaller than the cached size
func (c *Cache) SetMaxSize(size int64) {
@@ -224,15 +256,18 @@ func (c *Cache) promote(item *Item) {
func (c *Cache) worker() {
defer close(c.control)
dropped := 0
promoteItem := func(item *Item) {
if c.doPromote(item) && c.size > c.maxSize {
dropped += c.gc()
}
}
for {
select {
case item, ok := <-c.promotables:
if ok == false {
goto drain
}
if c.doPromote(item) && c.size > c.maxSize {
dropped += c.gc()
}
promoteItem(item)
case item := <-c.deletables:
c.doDelete(item)
case control := <-c.control:
@@ -252,6 +287,10 @@ func (c *Cache) worker() {
c.size = 0
c.list = list.New()
msg.done <- struct{}{}
case syncWorker:
doAllPendingPromotesAndDeletes(c.promotables, promoteItem,
c.deletables, c.doDelete)
msg.done <- struct{}{}
}
}
}
@@ -268,6 +307,39 @@ drain:
}
}
// This method is used to implement SyncUpdates. It simply receives and processes as many
// items as it can receive from the promotables and deletables channels immediately without
// blocking. If some other goroutine sends an item on either channel after this method has
// finished receiving, that's OK, because SyncUpdates only guarantees processing of values
// that were already sent by the same goroutine.
func doAllPendingPromotesAndDeletes(
promotables <-chan *Item,
promoteFn func(*Item),
deletables <-chan *Item,
deleteFn func(*Item),
) {
doAllPromotes:
for {
select {
case item := <-promotables:
if item != nil {
promoteFn(item)
}
default:
break doAllPromotes
}
}
doAllDeletes:
for {
select {
case item := <-deletables:
deleteFn(item)
default:
break doAllDeletes
}
}
}
func (c *Cache) doDelete(item *Item) {
if item.element == nil {
item.promotions = -2

View File

@@ -18,6 +18,7 @@ func Test_Cache(t *testing.T) {
func (_ CacheTests) DeletesAValue() {
cache := New(Configure())
defer cache.Stop()
Expect(cache.ItemCount()).To.Equal(0)
cache.Set("spice", "flow", time.Minute)
@@ -32,6 +33,7 @@ func (_ CacheTests) DeletesAValue() {
func (_ CacheTests) DeletesAPrefix() {
cache := New(Configure())
defer cache.Stop()
Expect(cache.ItemCount()).To.Equal(0)
cache.Set("aaa", "1", time.Minute)
@@ -55,6 +57,7 @@ func (_ CacheTests) DeletesAPrefix() {
func (_ CacheTests) DeletesAFunc() {
cache := New(Configure())
defer cache.Stop()
Expect(cache.ItemCount()).To.Equal(0)
cache.Set("a", 1, time.Minute)
@@ -91,12 +94,14 @@ func (_ CacheTests) OnDeleteCallbackCalled() {
}
cache := New(Configure().OnDelete(onDeleteFn))
defer cache.Stop()
cache.Set("spice", "flow", time.Minute)
cache.Set("worm", "sand", time.Minute)
time.Sleep(time.Millisecond * 10) // Run once to init
cache.SyncUpdates() // wait for worker to pick up preceding updates
cache.Delete("spice")
time.Sleep(time.Millisecond * 10) // Wait for worker to pick up deleted items
cache.SyncUpdates()
Expect(cache.Get("spice")).To.Equal(nil)
Expect(cache.Get("worm").Value()).To.Equal("sand")
@@ -105,6 +110,7 @@ func (_ CacheTests) OnDeleteCallbackCalled() {
func (_ CacheTests) FetchesExpiredItems() {
cache := New(Configure())
defer cache.Stop()
fn := func() (interface{}, error) { return "moo-moo", nil }
cache.Set("beef", "moo", time.Second*-1)
@@ -116,11 +122,11 @@ func (_ CacheTests) FetchesExpiredItems() {
func (_ CacheTests) GCsTheOldestItems() {
cache := New(Configure().ItemsToPrune(10))
defer cache.Stop()
for i := 0; i < 500; i++ {
cache.Set(strconv.Itoa(i), i, time.Minute)
}
//let the items get promoted (and added to our list)
time.Sleep(time.Millisecond * 10)
cache.SyncUpdates()
gcCache(cache)
Expect(cache.Get("9")).To.Equal(nil)
Expect(cache.Get("10").Value()).To.Equal(10)
@@ -129,12 +135,13 @@ func (_ CacheTests) GCsTheOldestItems() {
func (_ CacheTests) PromotedItemsDontGetPruned() {
cache := New(Configure().ItemsToPrune(10).GetsPerPromote(1))
defer cache.Stop()
for i := 0; i < 500; i++ {
cache.Set(strconv.Itoa(i), i, time.Minute)
}
time.Sleep(time.Millisecond * 10) //run the worker once to init the list
cache.SyncUpdates()
cache.Get("9")
time.Sleep(time.Millisecond * 10)
cache.SyncUpdates()
gcCache(cache)
Expect(cache.Get("9").Value()).To.Equal(9)
Expect(cache.Get("10")).To.Equal(nil)
@@ -148,7 +155,7 @@ func (_ CacheTests) TrackerDoesNotCleanupHeldInstance() {
cache.Set(strconv.Itoa(i), i, time.Minute)
}
item1 := cache.TrackingGet("1")
time.Sleep(time.Millisecond * 10)
cache.SyncUpdates()
gcCache(cache)
Expect(cache.Get("0").Value()).To.Equal(0)
Expect(cache.Get("1").Value()).To.Equal(1)
@@ -171,7 +178,7 @@ func (_ CacheTests) RemovesOldestItemWhenFull() {
for i := 0; i < 7; i++ {
cache.Set(strconv.Itoa(i), i, time.Minute)
}
time.Sleep(time.Millisecond * 10)
cache.SyncUpdates()
Expect(cache.Get("0")).To.Equal(nil)
Expect(cache.Get("1")).To.Equal(nil)
Expect(cache.Get("2").Value()).To.Equal(2)
@@ -184,7 +191,7 @@ func (_ CacheTests) RemovesOldestItemWhenFullBySizer() {
for i := 0; i < 7; i++ {
cache.Set(strconv.Itoa(i), &SizedItem{i, 2}, time.Minute)
}
time.Sleep(time.Millisecond * 10)
cache.SyncUpdates()
Expect(cache.Get("0")).To.Equal(nil)
Expect(cache.Get("1")).To.Equal(nil)
Expect(cache.Get("2")).To.Equal(nil)
@@ -198,19 +205,19 @@ func (_ CacheTests) SetUpdatesSizeOnDelta() {
cache := New(Configure())
cache.Set("a", &SizedItem{0, 2}, time.Minute)
cache.Set("b", &SizedItem{0, 3}, time.Minute)
time.Sleep(time.Millisecond * 5)
cache.SyncUpdates()
checkSize(cache, 5)
cache.Set("b", &SizedItem{0, 3}, time.Minute)
time.Sleep(time.Millisecond * 5)
cache.SyncUpdates()
checkSize(cache, 5)
cache.Set("b", &SizedItem{0, 4}, time.Minute)
time.Sleep(time.Millisecond * 5)
cache.SyncUpdates()
checkSize(cache, 6)
cache.Set("b", &SizedItem{0, 2}, time.Minute)
time.Sleep(time.Millisecond * 5)
cache.SyncUpdates()
checkSize(cache, 4)
cache.Delete("b")
time.Sleep(time.Millisecond * 100)
cache.SyncUpdates()
checkSize(cache, 2)
}
@@ -220,7 +227,7 @@ func (_ CacheTests) ReplaceDoesNotchangeSizeIfNotSet() {
cache.Set("2", &SizedItem{1, 2}, time.Minute)
cache.Set("3", &SizedItem{1, 2}, time.Minute)
cache.Replace("4", &SizedItem{1, 2})
time.Sleep(time.Millisecond * 5)
cache.SyncUpdates()
checkSize(cache, 6)
}
@@ -230,15 +237,15 @@ func (_ CacheTests) ReplaceChangesSize() {
cache.Set("2", &SizedItem{1, 2}, time.Minute)
cache.Replace("2", &SizedItem{1, 2})
time.Sleep(time.Millisecond * 5)
cache.SyncUpdates()
checkSize(cache, 4)
cache.Replace("2", &SizedItem{1, 1})
time.Sleep(time.Millisecond * 5)
cache.SyncUpdates()
checkSize(cache, 3)
cache.Replace("2", &SizedItem{1, 3})
time.Sleep(time.Millisecond * 5)
cache.SyncUpdates()
checkSize(cache, 5)
}
@@ -248,7 +255,7 @@ func (_ CacheTests) ResizeOnTheFly() {
cache.Set(strconv.Itoa(i), i, time.Minute)
}
cache.SetMaxSize(3)
time.Sleep(time.Millisecond * 10)
cache.SyncUpdates()
Expect(cache.GetDropped()).To.Equal(2)
Expect(cache.Get("0")).To.Equal(nil)
Expect(cache.Get("1")).To.Equal(nil)
@@ -257,7 +264,7 @@ func (_ CacheTests) ResizeOnTheFly() {
Expect(cache.Get("4").Value()).To.Equal(4)
cache.Set("5", 5, time.Minute)
time.Sleep(time.Millisecond * 5)
cache.SyncUpdates()
Expect(cache.GetDropped()).To.Equal(1)
Expect(cache.Get("2")).To.Equal(nil)
Expect(cache.Get("3").Value()).To.Equal(3)
@@ -266,7 +273,7 @@ func (_ CacheTests) ResizeOnTheFly() {
cache.SetMaxSize(10)
cache.Set("6", 6, time.Minute)
time.Sleep(time.Millisecond * 10)
cache.SyncUpdates()
Expect(cache.GetDropped()).To.Equal(0)
Expect(cache.Get("3").Value()).To.Equal(3)
Expect(cache.Get("4").Value()).To.Equal(4)
@@ -282,23 +289,23 @@ func (_ CacheTests) ForEachFunc() {
Expect(forEachKeys(cache)).To.Equal([]string{"1"})
cache.Set("2", 2, time.Minute)
time.Sleep(time.Millisecond * 10)
cache.SyncUpdates()
Expect(forEachKeys(cache)).To.Equal([]string{"1", "2"})
cache.Set("3", 3, time.Minute)
time.Sleep(time.Millisecond * 10)
cache.SyncUpdates()
Expect(forEachKeys(cache)).To.Equal([]string{"1", "2", "3"})
cache.Set("4", 4, time.Minute)
time.Sleep(time.Millisecond * 10)
cache.SyncUpdates()
Expect(forEachKeys(cache)).To.Equal([]string{"2", "3", "4"})
cache.Set("stop", 5, time.Minute)
time.Sleep(time.Millisecond * 10)
cache.SyncUpdates()
Expect(forEachKeys(cache)).Not.To.Contain("stop")
cache.Set("6", 6, time.Minute)
time.Sleep(time.Millisecond * 10)
cache.SyncUpdates()
Expect(forEachKeys(cache)).Not.To.Contain("stop")
}

11
item.go
View File

@@ -2,6 +2,7 @@ package ccache
import (
"container/list"
"fmt"
"sync/atomic"
"time"
)
@@ -105,3 +106,13 @@ func (i *Item) Expires() time.Time {
func (i *Item) Extend(duration time.Duration) {
atomic.StoreInt64(&i.expires, time.Now().Add(duration).UnixNano())
}
// String returns a string representation of the Item. This includes the default string
// representation of its Value(), as implemented by fmt.Sprintf with "%v", but the exact
// format of the string should not be relied on; it is provided only for debugging
// purposes, and because otherwise including an Item in a call to fmt.Printf or
// fmt.Sprintf expression could cause fields of the Item to be read in a non-thread-safe
// way.
func (i *Item) String() string {
return fmt.Sprintf("Item(%v)", i.value)
}

View File

@@ -183,9 +183,13 @@ func (c *LayeredCache) Stop() {
// Gets the number of items removed from the cache due to memory pressure since
// the last time GetDropped was called
func (c *LayeredCache) GetDropped() int {
res := make(chan int)
c.control <- getDropped{res: res}
return <-res
return doGetDropped(c.control)
}
// SyncUpdates waits until the cache has finished asynchronous state updates for any operations
// that were done by the current goroutine up to now. See Cache.SyncUpdates for details.
func (c *LayeredCache) SyncUpdates() {
doSyncUpdates(c.control)
}
// Sets a new max size. That can result in a GC being run if the new maxium size
@@ -222,16 +226,12 @@ func (c *LayeredCache) promote(item *Item) {
func (c *LayeredCache) worker() {
defer close(c.control)
dropped := 0
for {
select {
case item, ok := <-c.promotables:
if ok == false {
return
}
promoteItem := func(item *Item) {
if c.doPromote(item) && c.size > c.maxSize {
dropped += c.gc()
}
case item := <-c.deletables:
}
deleteItem := func(item *Item) {
if item.element == nil {
atomic.StoreInt32(&item.promotions, -2)
} else {
@@ -241,6 +241,16 @@ func (c *LayeredCache) worker() {
}
c.list.Remove(item.element)
}
}
for {
select {
case item, ok := <-c.promotables:
if ok == false {
return
}
promoteItem(item)
case item := <-c.deletables:
deleteItem(item)
case control := <-c.control:
switch msg := control.(type) {
case getDropped:
@@ -258,6 +268,10 @@ func (c *LayeredCache) worker() {
c.size = 0
c.list = list.New()
msg.done <- struct{}{}
case syncWorker:
doAllPendingPromotesAndDeletes(c.promotables, promoteItem,
c.deletables, deleteItem)
msg.done <- struct{}{}
}
}
}

View File

@@ -139,9 +139,9 @@ func (_ *LayeredCacheTests) OnDeleteCallbackCalled() {
cache.Set("spice", "must", "value-b", time.Minute)
cache.Set("leto", "sister", "ghanima", time.Minute)
time.Sleep(time.Millisecond * 10) // Run once to init
cache.SyncUpdates()
cache.Delete("spice", "flow")
time.Sleep(time.Millisecond * 10) // Wait for worker to pick up deleted items
cache.SyncUpdates()
Expect(cache.Get("spice", "flow")).To.Equal(nil)
Expect(cache.Get("spice", "must").Value()).To.Equal("value-b")
@@ -171,7 +171,7 @@ func (_ LayeredCacheTests) GCsTheOldestItems() {
}
cache.Set("xx", "b", 9001, time.Minute)
//let the items get promoted (and added to our list)
time.Sleep(time.Millisecond * 10)
cache.SyncUpdates()
gcLayeredCache(cache)
Expect(cache.Get("xx", "a")).To.Equal(nil)
Expect(cache.Get("xx", "b").Value()).To.Equal(9001)
@@ -185,9 +185,9 @@ func (_ LayeredCacheTests) PromotedItemsDontGetPruned() {
for i := 0; i < 500; i++ {
cache.Set(strconv.Itoa(i), "a", i, time.Minute)
}
time.Sleep(time.Millisecond * 10) //run the worker once to init the list
cache.SyncUpdates()
cache.Get("9", "a")
time.Sleep(time.Millisecond * 10)
cache.SyncUpdates()
gcLayeredCache(cache)
Expect(cache.Get("9", "a").Value()).To.Equal(9)
Expect(cache.Get("10", "a")).To.Equal(nil)
@@ -201,7 +201,7 @@ func (_ LayeredCacheTests) TrackerDoesNotCleanupHeldInstance() {
cache.Set(strconv.Itoa(i), "a", i, time.Minute)
}
item1 := cache.TrackingGet("1", "a")
time.Sleep(time.Millisecond * 10)
cache.SyncUpdates()
gcLayeredCache(cache)
Expect(cache.Get("0", "a").Value()).To.Equal(0)
Expect(cache.Get("1", "a").Value()).To.Equal(1)
@@ -219,7 +219,7 @@ func (_ LayeredCacheTests) RemovesOldestItemWhenFull() {
cache.Set(strconv.Itoa(i), "a", i, time.Minute)
}
cache.Set("xx", "b", 9001, time.Minute)
time.Sleep(time.Millisecond * 10)
cache.SyncUpdates()
Expect(cache.Get("xx", "a")).To.Equal(nil)
Expect(cache.Get("0", "a")).To.Equal(nil)
Expect(cache.Get("1", "a")).To.Equal(nil)
@@ -236,7 +236,7 @@ func (_ LayeredCacheTests) ResizeOnTheFly() {
cache.Set(strconv.Itoa(i), "a", i, time.Minute)
}
cache.SetMaxSize(3)
time.Sleep(time.Millisecond * 10)
cache.SyncUpdates()
Expect(cache.GetDropped()).To.Equal(2)
Expect(cache.Get("0", "a")).To.Equal(nil)
Expect(cache.Get("1", "a")).To.Equal(nil)
@@ -245,7 +245,7 @@ func (_ LayeredCacheTests) ResizeOnTheFly() {
Expect(cache.Get("4", "a").Value()).To.Equal(4)
cache.Set("5", "a", 5, time.Minute)
time.Sleep(time.Millisecond * 5)
cache.SyncUpdates()
Expect(cache.GetDropped()).To.Equal(1)
Expect(cache.Get("2", "a")).To.Equal(nil)
Expect(cache.Get("3", "a").Value()).To.Equal(3)
@@ -254,7 +254,7 @@ func (_ LayeredCacheTests) ResizeOnTheFly() {
cache.SetMaxSize(10)
cache.Set("6", "a", 6, time.Minute)
time.Sleep(time.Millisecond * 10)
cache.SyncUpdates()
Expect(cache.GetDropped()).To.Equal(0)
Expect(cache.Get("3", "a").Value()).To.Equal(3)
Expect(cache.Get("4", "a").Value()).To.Equal(4)
@@ -267,7 +267,7 @@ func (_ LayeredCacheTests) RemovesOldestItemWhenFullBySizer() {
for i := 0; i < 7; i++ {
cache.Set("pri", strconv.Itoa(i), &SizedItem{i, 2}, time.Minute)
}
time.Sleep(time.Millisecond * 10)
cache.SyncUpdates()
Expect(cache.Get("pri", "0")).To.Equal(nil)
Expect(cache.Get("pri", "1")).To.Equal(nil)
Expect(cache.Get("pri", "2")).To.Equal(nil)
@@ -279,20 +279,20 @@ func (_ LayeredCacheTests) SetUpdatesSizeOnDelta() {
cache := Layered(Configure())
cache.Set("pri", "a", &SizedItem{0, 2}, time.Minute)
cache.Set("pri", "b", &SizedItem{0, 3}, time.Minute)
time.Sleep(time.Millisecond * 5)
cache.SyncUpdates()
checkLayeredSize(cache, 5)
cache.Set("pri", "b", &SizedItem{0, 3}, time.Minute)
time.Sleep(time.Millisecond * 5)
cache.SyncUpdates()
checkLayeredSize(cache, 5)
cache.Set("pri", "b", &SizedItem{0, 4}, time.Minute)
time.Sleep(time.Millisecond * 5)
cache.SyncUpdates()
checkLayeredSize(cache, 6)
cache.Set("pri", "b", &SizedItem{0, 2}, time.Minute)
cache.Set("sec", "b", &SizedItem{0, 3}, time.Minute)
time.Sleep(time.Millisecond * 5)
cache.SyncUpdates()
checkLayeredSize(cache, 7)
cache.Delete("pri", "b")
time.Sleep(time.Millisecond * 10)
cache.SyncUpdates()
checkLayeredSize(cache, 5)
}
@@ -302,7 +302,7 @@ func (_ LayeredCacheTests) ReplaceDoesNotchangeSizeIfNotSet() {
cache.Set("pri", "2", &SizedItem{1, 2}, time.Minute)
cache.Set("pri", "3", &SizedItem{1, 2}, time.Minute)
cache.Replace("sec", "3", &SizedItem{1, 2})
time.Sleep(time.Millisecond * 5)
cache.SyncUpdates()
checkLayeredSize(cache, 6)
}
@@ -312,15 +312,15 @@ func (_ LayeredCacheTests) ReplaceChangesSize() {
cache.Set("pri", "2", &SizedItem{1, 2}, time.Minute)
cache.Replace("pri", "2", &SizedItem{1, 2})
time.Sleep(time.Millisecond * 5)
cache.SyncUpdates()
checkLayeredSize(cache, 4)
cache.Replace("pri", "2", &SizedItem{1, 1})
time.Sleep(time.Millisecond * 5)
cache.SyncUpdates()
checkLayeredSize(cache, 3)
cache.Replace("pri", "2", &SizedItem{1, 3})
time.Sleep(time.Millisecond * 5)
cache.SyncUpdates()
checkLayeredSize(cache, 5)
}
@@ -332,24 +332,24 @@ func (_ LayeredCacheTests) EachFunc() {
Expect(forEachKeysLayered(cache, "1")).To.Equal([]string{"a"})
cache.Set("1", "b", 2, time.Minute)
time.Sleep(time.Millisecond * 10)
cache.SyncUpdates()
Expect(forEachKeysLayered(cache, "1")).To.Equal([]string{"a", "b"})
cache.Set("1", "c", 3, time.Minute)
time.Sleep(time.Millisecond * 10)
cache.SyncUpdates()
Expect(forEachKeysLayered(cache, "1")).To.Equal([]string{"a", "b", "c"})
cache.Set("1", "d", 4, time.Minute)
time.Sleep(time.Millisecond * 10)
cache.SyncUpdates()
Expect(forEachKeysLayered(cache, "1")).To.Equal([]string{"b", "c", "d"})
// iteration is non-deterministic, all we know for sure is "stop" should not be in there
cache.Set("1", "stop", 5, time.Minute)
time.Sleep(time.Millisecond * 10)
cache.SyncUpdates()
Expect(forEachKeysLayered(cache, "1")).Not.To.Contain("stop")
cache.Set("1", "e", 6, time.Minute)
time.Sleep(time.Millisecond * 10)
cache.SyncUpdates()
Expect(forEachKeysLayered(cache, "1")).Not.To.Contain("stop")
}