From 890bb18dbfd62dbdd383c24297df2073b8f46fc1 Mon Sep 17 00:00:00 2001 From: Karl Seguin Date: Fri, 28 Feb 2014 20:10:42 +0800 Subject: [PATCH] The cache can now do reference counting so that the LRU algorithm is aware of long-lived objects and won't clean them up. Oftentimes, the value returned from a cache hit is short-lived. As a silly example: func GetUser(http.responseWrite) { user := cache.Get("user:1") response.Write(serialize(user)) } It's fine if the cache's GC cleans up "user:1" while the user variable has a reference to the object..the cache's reference is removed and the real GC will clean it up at some point after the user variable falls out of scope. However, what if user is long-lived? Possibly stored as a reference to another cached object? Normally (without this commit) the next time you call cache.Get("user:1"), you'll get a miss and will need to refetch the object; even though the original user object is still somewhere in memory - you just lost your reference to it from the cache. By enabling the Track() configuration flag, and calling TrackingGet() (instead of Get), the cache will track that the object is in-use and won't GC it (even if there's great memory pressure (what's the point? something else is holding on to it anyways). Calling item.Release() will decrement the number of references. When the count is 0, the item can be pruned from the cache. The returned value is a TrackedItem which exposes: - Value() interface{} (to get the actual cached value) - Release() to release the item back in the cache --- bucket.go | 64 +++++++------- bucket_test.go | 68 +++++++-------- cache.go | 217 +++++++++++++++++++++++++++-------------------- cache_test.go | 49 +++++++++++ configuration.go | 59 +++++++------ item.go | 59 +++++++++---- item_test.go | 12 +-- 7 files changed, 321 insertions(+), 207 deletions(-) create mode 100644 cache_test.go diff --git a/bucket.go b/bucket.go index 9d95b44..b93d51e 100644 --- a/bucket.go +++ b/bucket.go @@ -1,53 +1,53 @@ package ccache import ( - "sync" - "time" + "sync" + "time" ) type Bucket struct { - sync.RWMutex - lookup map[string]*Item + sync.RWMutex + lookup map[string]*Item } func (b *Bucket) get(key string) *Item { - b.RLock() - defer b.RUnlock() - return b.lookup[key] + b.RLock() + defer b.RUnlock() + return b.lookup[key] } func (b *Bucket) set(key string, value interface{}, duration time.Duration) (*Item, bool) { - expires := time.Now().Add(duration) - b.Lock() - defer b.Unlock() - if existing, exists := b.lookup[key]; exists { - existing.Lock() - existing.value = value - existing.expires = expires - existing.Unlock() - return existing, false - } - item := newItem(key, value, expires) - b.lookup[key] = item - return item, true + expires := time.Now().Add(duration) + b.Lock() + defer b.Unlock() + if existing, exists := b.lookup[key]; exists { + existing.Lock() + existing.value = value + existing.expires = expires + existing.Unlock() + return existing, false + } + item := newItem(key, value, expires) + b.lookup[key] = item + return item, true } func (b *Bucket) delete(key string) { - b.Lock() - defer b.Unlock() - delete(b.lookup, key) + b.Lock() + defer b.Unlock() + delete(b.lookup, key) } -func (b *Bucket) getAndDelete(key string) *Item{ - b.Lock() - defer b.Unlock() - item := b.lookup[key] - delete(b.lookup, key) - return item +func (b *Bucket) getAndDelete(key string) *Item { + b.Lock() + defer b.Unlock() + item := b.lookup[key] + delete(b.lookup, key) + return item } func (b *Bucket) clear() { - b.Lock() - defer b.Unlock() - b.lookup = make(map[string]*Item) + b.Lock() + defer b.Unlock() + b.lookup = make(map[string]*Item) } diff --git a/bucket_test.go b/bucket_test.go index 6b34360..23d086f 100644 --- a/bucket_test.go +++ b/bucket_test.go @@ -1,64 +1,64 @@ package ccache import ( - "time" - "testing" - "github.com/viki-org/gspec" + "github.com/karlseguin/gspec" + "testing" + "time" ) func TestGetMissFromBucket(t *testing.T) { - bucket := testBucket() - gspec.New(t).Expect(bucket.get("invalid")).ToBeNil() + bucket := testBucket() + gspec.New(t).Expect(bucket.get("invalid")).ToBeNil() } func TestGetHitFromBucket(t *testing.T) { - bucket := testBucket() - item := bucket.get("power") - assertValue(t, item, "9000") + bucket := testBucket() + item := bucket.get("power") + assertValue(t, item, "9000") } func TestDeleteItemFromBucket(t *testing.T) { - bucket := testBucket() - bucket.delete("power") - gspec.New(t).Expect(bucket.get("power")).ToBeNil() + bucket := testBucket() + bucket.delete("power") + gspec.New(t).Expect(bucket.get("power")).ToBeNil() } func TestSetsANewBucketItem(t *testing.T) { - spec := gspec.New(t) - bucket := testBucket() - item, new := bucket.set("spice", TestValue("flow"), time.Minute) - assertValue(t, item, "flow") - item = bucket.get("spice") - assertValue(t, item, "flow") - spec.Expect(new).ToEqual(true) + spec := gspec.New(t) + bucket := testBucket() + item, new := bucket.set("spice", TestValue("flow"), time.Minute) + assertValue(t, item, "flow") + item = bucket.get("spice") + assertValue(t, item, "flow") + spec.Expect(new).ToEqual(true) } func TestSetsAnExistingItem(t *testing.T) { - spec := gspec.New(t) - bucket := testBucket() - item, new := bucket.set("power", TestValue("9002"), time.Minute) - assertValue(t, item, "9002") - item = bucket.get("power") - assertValue(t, item, "9002") - spec.Expect(new).ToEqual(false) + spec := gspec.New(t) + bucket := testBucket() + item, new := bucket.set("power", TestValue("9002"), time.Minute) + assertValue(t, item, "9002") + item = bucket.get("power") + assertValue(t, item, "9002") + spec.Expect(new).ToEqual(false) } func testBucket() *Bucket { - b := &Bucket{lookup: make(map[string]*Item),} - b.lookup["power"] = &Item{ - key: "power", - value: TestValue("9000"), - } - return b + b := &Bucket{lookup: make(map[string]*Item)} + b.lookup["power"] = &Item{ + key: "power", + value: TestValue("9000"), + } + return b } func assertValue(t *testing.T, item *Item, expected string) { - value := item.value.(TestValue) - gspec.New(t).Expect(value).ToEqual(TestValue(expected)) + value := item.value.(TestValue) + gspec.New(t).Expect(value).ToEqual(TestValue(expected)) } type TestValue string func (v TestValue) Expires() time.Time { - return time.Now() + return time.Now() } diff --git a/cache.go b/cache.go index 5e4290d..c447305 100644 --- a/cache.go +++ b/cache.go @@ -1,139 +1,172 @@ +// An LRU cached aimed at high concurrency package ccache import ( - "time" - "runtime" - "hash/fnv" - "container/list" + "container/list" + "hash/fnv" + "runtime" + "sync/atomic" + "time" ) type Cache struct { - *Configuration - list *list.List - buckets []*Bucket - bucketCount uint32 - deletables chan *Item - promotables chan *Item + *Configuration + list *list.List + buckets []*Bucket + bucketCount uint32 + deletables chan *Item + promotables chan *Item } func New(config *Configuration) *Cache { - c := &Cache{ - list: list.New(), - Configuration: config, - bucketCount: uint32(config.buckets), - buckets: make([]*Bucket, config.buckets), - deletables: make(chan *Item, config.deleteBuffer), - promotables: make(chan *Item, config.promoteBuffer), - } - for i := 0; i < config.buckets; i++ { - c.buckets[i] = &Bucket{ - lookup: make(map[string]*Item), - } - } - go c.worker() - return c + c := &Cache{ + list: list.New(), + Configuration: config, + bucketCount: uint32(config.buckets), + buckets: make([]*Bucket, config.buckets), + deletables: make(chan *Item, config.deleteBuffer), + promotables: make(chan *Item, config.promoteBuffer), + } + for i := 0; i < config.buckets; i++ { + c.buckets[i] = &Bucket{ + lookup: make(map[string]*Item), + } + } + go c.worker() + return c } func (c *Cache) Get(key string) interface{} { - bucket := c.bucket(key) - item := bucket.get(key) - if item == nil { return nil } - if item.expires.Before(time.Now()) { - c.deleteItem(bucket, item) - return nil - } - c.conditionalPromote(item) - return item.value + if item := c.get(key); item != nil { + return item.value + } + return nil +} + +func (c *Cache) TrackingGet(key string) TrackedItem { + item := c.get(key) + if item == nil { + return NilTracked + } + item.track() + return item +} + +func (c *Cache) get(key string) *Item { + bucket := c.bucket(key) + item := bucket.get(key) + if item == nil { + return nil + } + if item.expires.Before(time.Now()) { + c.deleteItem(bucket, item) + return nil + } + c.conditionalPromote(item) + return item } func (c *Cache) Set(key string, value interface{}, duration time.Duration) { - item, new := c.bucket(key).set(key, value, duration) - if new { - c.promote(item) - } else { - c.conditionalPromote(item) - } + item, new := c.bucket(key).set(key, value, duration) + if new { + c.promote(item) + } else { + c.conditionalPromote(item) + } } func (c *Cache) Fetch(key string, duration time.Duration, fetch func() (interface{}, error)) (interface{}, error) { - item := c.Get(key) - if item != nil { return item, nil } - value, err := fetch() - if err == nil { - c.Set(key, value, duration) - } - return value, err + item := c.Get(key) + if item != nil { + return item, nil + } + value, err := fetch() + if err == nil { + c.Set(key, value, duration) + } + return value, err } func (c *Cache) Delete(key string) { - item := c.bucket(key).getAndDelete(key) - if item != nil { - c.deletables <- item - } + item := c.bucket(key).getAndDelete(key) + if item != nil { + c.deletables <- item + } } //this isn't thread safe. It's meant to be called from non-concurrent tests func (c *Cache) Clear() { - for _, bucket := range c.buckets { - bucket.clear() - } - c.list = list.New() + for _, bucket := range c.buckets { + bucket.clear() + } + c.list = list.New() } func (c *Cache) deleteItem(bucket *Bucket, item *Item) { - bucket.delete(item.key) //stop othe GETs from getting it - c.deletables <- item + bucket.delete(item.key) //stop othe GETs from getting it + c.deletables <- item } func (c *Cache) bucket(key string) *Bucket { - h := fnv.New32a() - h.Write([]byte(key)) - index := h.Sum32() % c.bucketCount - return c.buckets[index] + h := fnv.New32a() + h.Write([]byte(key)) + index := h.Sum32() % c.bucketCount + return c.buckets[index] } func (c *Cache) conditionalPromote(item *Item) { - if item.shouldPromote(c.getsPerPromote) == false { return } - c.promote(item) + if item.shouldPromote(c.getsPerPromote) == false { + return + } + c.promote(item) } func (c *Cache) promote(item *Item) { - c.promotables <- item + c.promotables <- item } func (c *Cache) worker() { - ms := new(runtime.MemStats) - for { - select { - case item := <- c.promotables: - wasNew := c.doPromote(item) - if wasNew == false { continue } - runtime.ReadMemStats(ms) - if ms.HeapAlloc > c.size { c.gc() } - case item := <- c.deletables: - c.list.Remove(item.element) - } - } + ms := new(runtime.MemStats) + for { + select { + case item := <-c.promotables: + if wasNew := c.doPromote(item); wasNew == false { + continue + } + runtime.ReadMemStats(ms) + if ms.HeapAlloc > c.size { + c.gc() + } + case item := <-c.deletables: + c.list.Remove(item.element) + } + } } func (c *Cache) doPromote(item *Item) bool { - item.Lock() - defer item.Unlock() - item.promotions = 0 - if item.element != nil { //not a new item - c.list.MoveToFront(item.element) - return false - } - item.element = c.list.PushFront(item) - return true + item.Lock() + defer item.Unlock() + item.promotions = 0 + if item.element != nil { //not a new item + c.list.MoveToFront(item.element) + return false + } + item.element = c.list.PushFront(item) + return true } func (c *Cache) gc() { - for i := 0; i < c.itemsToPrune; i++ { - element := c.list.Back() - if element == nil { return } - item := element.Value.(*Item) - c.bucket(item.key).delete(item.key) - c.list.Remove(element) - } + element := c.list.Back() + for i := 0; i < c.itemsToPrune; i++ { + if element == nil { + return + } + prev := element.Prev() + item := element.Value.(*Item) + if c.tracking == false || atomic.LoadInt32(&item.refCount) == 0 { + c.bucket(item.key).delete(item.key) + c.list.Remove(element) + } + element = prev + } } diff --git a/cache_test.go b/cache_test.go new file mode 100644 index 0000000..f62e16c --- /dev/null +++ b/cache_test.go @@ -0,0 +1,49 @@ +package ccache + +import ( + "github.com/karlseguin/gspec" + "testing" + "strconv" + "time" +) + +func TestGCsTheOldestItems(t *testing.T) { + spec := gspec.New(t) + cache := New(Configure().ItemsToPrune(10)) + for i := 0; i < 500; i++ { + cache.Set(strconv.Itoa(i), i, time.Minute) + } + cache.gc() + spec.Expect(cache.Get("9")).ToBeNil() + spec.Expect(cache.Get("10").(int)).ToEqual(10) +} + +func TestPromotedItemsDontGetPruned(t *testing.T) { + spec := gspec.New(t) + cache := New(Configure().ItemsToPrune(10).GetsPerPromote(1)) + for i := 0; i < 500; i++ { + cache.Set(strconv.Itoa(i), i, time.Minute) + } + cache.Get("9") + time.Sleep(time.Millisecond * 10) + cache.gc() + spec.Expect(cache.Get("9").(int)).ToEqual(9) + spec.Expect(cache.Get("10")).ToBeNil() + spec.Expect(cache.Get("11").(int)).ToEqual(11) +} + +func TestTrackerDoesNotCleanupHeldInstance(t *testing.T) { + spec := gspec.New(t) + cache := New(Configure().ItemsToPrune(10).Track()) + for i := 0; i < 10; i++ { + cache.Set(strconv.Itoa(i), i, time.Minute) + } + item := cache.TrackingGet("0") + time.Sleep(time.Millisecond * 10) + cache.gc() + spec.Expect(cache.Get("0").(int)).ToEqual(0) + spec.Expect(cache.Get("1")).ToBeNil() + item.Release() + cache.gc() + spec.Expect(cache.Get("0")).ToBeNil() +} diff --git a/configuration.go b/configuration.go index d6cef98..f38e855 100644 --- a/configuration.go +++ b/configuration.go @@ -1,51 +1,58 @@ package ccache type Configuration struct { - size uint64 - buckets int - itemsToPrune int - deleteBuffer int - promoteBuffer int - getsPerPromote int32 + size uint64 + buckets int + itemsToPrune int + deleteBuffer int + promoteBuffer int + getsPerPromote int32 + tracking bool } func Configure() *Configuration { - return &Configuration { - buckets: 64, - itemsToPrune: 500, - deleteBuffer: 1024, - getsPerPromote: 10, - promoteBuffer: 1024, - size: 500 * 1024 * 1024, - } + return &Configuration{ + buckets: 64, + itemsToPrune: 500, + deleteBuffer: 1024, + getsPerPromote: 10, + promoteBuffer: 1024, + size: 500 * 1024 * 1024, + tracking: false, + } } func (c *Configuration) Size(bytes uint64) *Configuration { - c.size = bytes - return c + c.size = bytes + return c } func (c *Configuration) Buckets(count int) *Configuration { - c.buckets = count - return c + c.buckets = count + return c } func (c *Configuration) ItemsToPrune(count int) *Configuration { - c.itemsToPrune = count - return c + c.itemsToPrune = count + return c } func (c *Configuration) PromoteBuffer(size int) *Configuration { - c.promoteBuffer = size - return c + c.promoteBuffer = size + return c } func (c *Configuration) DeleteBuffer(size int) *Configuration { - c.deleteBuffer = size - return c + c.deleteBuffer = size + return c } func (c *Configuration) GetsPerPromote(count int) *Configuration { - c.getsPerPromote = int32(count) - return c + c.getsPerPromote = int32(count) + return c +} + +func (c *Configuration) Track() *Configuration { + c.tracking = true + return c } diff --git a/item.go b/item.go index f677ef7..80c1028 100644 --- a/item.go +++ b/item.go @@ -1,30 +1,55 @@ package ccache import ( - "sync" - "time" - "sync/atomic" - "container/list" + "container/list" + "sync" + "sync/atomic" + "time" ) +type TrackedItem interface { + Value() interface{} + Release() +} + +type nilItem struct{} + +func (n *nilItem) Value() interface{} { return nil } +func (n *nilItem) Release() {} + +var NilTracked = new(nilItem) + type Item struct { - key string - sync.RWMutex - promotions int32 - expires time.Time - value interface{} - element *list.Element + key string + sync.RWMutex + promotions int32 + refCount int32 + expires time.Time + value interface{} + element *list.Element } func newItem(key string, value interface{}, expires time.Time) *Item { - return &Item{ - key: key, - value: value, - promotions: -1, - expires: expires, - } + return &Item{ + key: key, + value: value, + promotions: -1, + expires: expires, + } } func (i *Item) shouldPromote(getsPerPromote int32) bool { - return atomic.AddInt32(&i.promotions, 1) == getsPerPromote + return atomic.AddInt32(&i.promotions, 1) == getsPerPromote +} + +func (i *Item) Value() interface{} { + return i.value +} + +func (i *Item) track() { + atomic.AddInt32(&i.refCount, 1) +} + +func (i *Item) Release() { + atomic.AddInt32(&i.refCount, -1) } diff --git a/item_test.go b/item_test.go index eda1f58..a0c6e49 100644 --- a/item_test.go +++ b/item_test.go @@ -1,13 +1,13 @@ package ccache import ( - "testing" - "github.com/viki-org/gspec" + "github.com/karlseguin/gspec" + "testing" ) func TestItemPromotability(t *testing.T) { - spec := gspec.New(t) - item := &Item{promotions: 4} - spec.Expect(item.shouldPromote(5)).ToEqual(true) - spec.Expect(item.shouldPromote(5)).ToEqual(false) + spec := gspec.New(t) + item := &Item{promotions: 4} + spec.Expect(item.shouldPromote(5)).ToEqual(true) + spec.Expect(item.shouldPromote(5)).ToEqual(false) }