47 Commits

Author SHA1 Message Date
Karl Seguin
d9aec58960 add GetDropped documentation 2020-02-16 11:54:07 +08:00
Karl Seguin
1a257a89d6 add GetDropped function 2020-02-05 22:05:05 +08:00
Karl Seguin
78289f8f0b Merge pull request #38 from karlseguin/DeletePrefix
Delete prefix
2020-01-23 13:03:42 +08:00
Karl Seguin
79f9dcde21 fewer defers, document DeletePrefix 2020-01-23 12:55:55 +08:00
Karl Seguin
04261a5282 Merge branch 'master' into DeletePrefix 2020-01-23 12:50:53 +08:00
Karl Seguin
569ae60338 Merge pull request #37 from aporeto-inc/pin-expect
fixed/module: expect needs to be on master
2020-01-23 11:44:58 +08:00
Antoine Mercadal
048ac0669f fixed/module: expect needs to be on master 2020-01-22 19:35:19 -08:00
Karl Seguin
2ff4136636 Merge pull request #36 from aporeto-inc/go-modules
migrate to go modules
2020-01-23 11:19:07 +08:00
Antoine Mercadal
f79de0e254 migrate to go modules 2020-01-22 19:16:52 -08:00
Karl Seguin
356e164dd5 preliminary work on DeletePrefix 2020-01-23 10:27:12 +08:00
Karl Seguin
2ff889bcae document Clear 2020-01-23 10:04:47 +08:00
Karl Seguin
46ec5d2257 explicitly state the thread-safety nature of the library 2020-01-23 09:38:19 +08:00
Karl Seguin
ec06cd93a0 Merge pull request #28 from buglloc/bucket_tests
Fixed bucket tests
2019-02-23 22:26:40 +07:00
Andrew Krasichkov
8d8b062716 fixed bucket tests 2019-02-14 15:54:01 +03:00
Karl Seguin
3385784411 Add cache.ItemCount() intt64 API 2019-01-26 12:33:50 +07:00
Karl Seguin
692cd618b2 guard access to item.promotions in LayeredCache, which was applied to Cache in 557d56ec6f 2018-12-27 22:54:50 +07:00
Karl Seguin
142396791e Merge pull request #22 from alexejk/gcOnDelete
Calling OnDelete from gc()
2018-11-26 20:29:43 +07:00
Alexej Kubarev
243f5c8219 Fixes #21. Callong OnDelete during gc() 2018-11-25 15:31:09 -08:00
Alexej Kubarev
7e55af0a9c Small routine lock file upgrade 2018-11-25 15:29:47 -08:00
Karl Seguin
a317416755 Merge pull request #16 from alexejk/onremove
Support for OnDelete() callback
2018-07-22 11:17:02 +07:00
Alexej Kubarev
7421e2d7b4 Adding support for OnDelete callback function
OnDelete will receive an item that is being processed for deletion to support calling cleanup function specific to the item stored
2018-07-16 18:20:17 +02:00
Alexej Kubarev
72059a01e9 Adding missing ignore file to ensure vendor folder is not checked in 2018-07-16 18:18:51 +02:00
Alexej Kubarev
00324cb2d2 Adding dep dependency management files to utilize vendoring 2018-07-16 18:18:02 +02:00
Karl Seguin
b425c9ca00 Merge pull request #11 from EdwardBetts/spelling
correct spelling mistake
2017-09-04 13:47:32 +07:00
Edward Betts
0d05fa8278 correct spelling mistake 2017-09-01 11:40:44 +01:00
Karl Seguin
3ba9789cfd Merge pull request #10 from heyitsanthony/test-races
fix data races in tests
2017-02-17 13:08:20 +07:00
Anthony Romano
b3c864ded7 cache: make Stop() synchronous and races in tests
worker goroutine running concurrently with tests would cause data race errors
when running tests with -race enabled.
2017-02-13 15:39:24 -08:00
Anthony Romano
c69270ce08 layeredcache: add Stop() and fix races in tests
worker goroutine running concurrently with tests would cause data race errors
when running tests with -race enabled.
2017-02-13 15:39:24 -08:00
Karl Seguin
12c7ffdc19 Merge pull request #9 from spicydog/patch-1
Fix an error in example "itemsToPrune"
2016-12-22 18:15:50 +07:00
spicydog
77679ba342 Fix an error in example "itemsToPrune"
itemsToPrune -> ItemsToPrune
2016-12-21 20:07:16 +10:00
Karl Seguin
a2d6215577 Merge pull request #8 from jdeppe-pivotal/master
Add a SecondaryCache which exposes the secondary part of a LayeredCache
2016-11-03 22:19:53 +07:00
Jens Deppe
a451d7262c Integrate feedback and upstream fixes
- Ensure correct locking in GetOrCreateSecondaryCache
- Fetch now returns a *Item
2016-11-01 23:53:22 -07:00
Jens Deppe
d2c2442186 Merge remote-tracking branch 'seguin/master' 2016-11-01 20:33:44 -07:00
Karl Seguin
8adbb5637b return *Item from layered cache fetch instead of interface{} 2016-11-02 09:34:09 +07:00
Jens Deppe
c1634a4d00 Add concept of a SecondaryCache which exposes the secondary part of a LayeredCache 2016-11-01 09:01:39 -07:00
Karl Seguin
2f6b517f7b Merge pull request #6 from HasMatthew/nanosecond_ttl
Use nanosecond-resolution TTL instead of second-resolution.
2016-07-07 20:03:45 -07:00
Matthew Dale
162d4e27ca Use nanosecond-resolution TTL instead of second-resolution. 2016-07-07 15:32:49 -07:00
Karl Seguin
ddcff8e624 Merge pull request #5 from dvdplm/master
Fetch does not return stale items
2016-02-05 22:16:17 +08:00
David Palm
3665b16e83 Better test 2016-02-05 14:34:58 +01:00
David Palm
d5307b40af Fetch does not return stale items 2016-02-03 16:07:59 +01:00
Karl Seguin
74754c77cc Partially fixing #3.
On close, drain the deletables channel (unblocking an waiting goroutines) and
close deletables. Like Gets and Sets against a now-closed promotables,
this means any subsequent to Deletes from deletables will panic.

I'm still not sure that this is ccache's responsibility. If a client closes a DB
connection, we'd expect subsequent operations against the now-closed connection
to fail. My main problems with defer'ing a recover are:

1 - the performance overhead on every single get / set / delete
2 - not communicating with the caller that the requested operatin is no longer
    valid.
2015-07-26 11:05:48 +08:00
Karl Seguin
bfa769c6b6 add Stop method to stop the background worker and make it possible for the GC to reap the object 2015-07-23 22:24:50 +08:00
Karl Seguin
41f1a3cfcb gonna be one of those days... 2015-01-07 08:12:17 +07:00
Karl Seguin
f9c7f14b7b Fetch's API wasn't usable. It returned different values types based on whether
the fetch was needed or not. It now behaves consistently (with itself and with
Get), returning an *Item.
2015-01-07 08:09:39 +07:00
Karl Seguin
6df1e24ae3 2 changes:
1 -
Previously, we determined if an item should be promoted in the main getter
thread. This required that we protect the item.promotions variable, as both
the getter and the worker were concurrently accessing it. This change pushes
the conditional promotion to the worker (from the getter's point of view, items
are always promoted). Since only the worker ever accesses .promotions, we no
longer must protect access to it.

2 -
The total size of the cache was being maintained by both the worker thread
and the calling code. This required that we protect access to cache.size. Now,
only the worker ever changes the size. While this simplifies much of the code,
it means that we can't easily replace an item (replacement either via Set or
Replace). A replcement now involves creating a new object and deleting the old
one (using the existing deletables and promotable infrastructure). The only
noticeable impact frmo this change is that, despite previous documentation,
Replace WILL cause the item to be promoted (but it still only does so if it
exists and it still doesn't extend the original TTL).
2014-12-28 11:11:32 +07:00
Karl Seguin
557d56ec6f guard all access to item.promotions 2014-12-28 10:35:20 +07:00
Karl Seguin
c75dcd4c12 link to rcache 2014-12-06 17:19:23 +07:00
16 changed files with 765 additions and 240 deletions

1
.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
vendor/

View File

@@ -1,8 +1,8 @@
package ccache
import (
"strings"
"sync"
"sync/atomic"
"time"
)
@@ -11,63 +11,77 @@ type bucket struct {
lookup map[string]*Item
}
func (b *bucket) itemCount() int {
b.RLock()
defer b.RUnlock()
return len(b.lookup)
}
func (b *bucket) get(key string) *Item {
b.RLock()
defer b.RUnlock()
return b.lookup[key]
}
func (b *bucket) set(key string, value interface{}, duration time.Duration) (*Item, bool, int64) {
expires := time.Now().Add(duration).Unix()
b.Lock()
defer b.Unlock()
if existing, exists := b.lookup[key]; exists {
existing.value = value
existing.expires = expires
d := int64(0)
if sized, ok := value.(Sized); ok {
newSize := sized.Size()
d = newSize - existing.size
if d != 0 {
atomic.StoreInt64(&existing.size, newSize)
}
}
return existing, false, int64(d)
}
func (b *bucket) set(key string, value interface{}, duration time.Duration) (*Item, *Item) {
expires := time.Now().Add(duration).UnixNano()
item := newItem(key, value, expires)
b.lookup[key] = item
return item, true, int64(item.size)
}
func (b *bucket) replace(key string, value interface{}) (bool, int64) {
b.Lock()
defer b.Unlock()
existing, exists := b.lookup[key]
if exists == false {
return false, 0
}
d := int64(0)
if sized, ok := value.(Sized); ok {
newSize := sized.Size()
d = newSize - existing.size
if d != 0 {
atomic.StoreInt64(&existing.size, newSize)
}
}
existing.value = value
return true, d
existing := b.lookup[key]
b.lookup[key] = item
b.Unlock()
return item, existing
}
func (b *bucket) delete(key string) *Item {
b.Lock()
defer b.Unlock()
item := b.lookup[key]
delete(b.lookup, key)
b.Unlock()
return item
}
// This is an expensive operation, so we do what we can to optimize it and limit
// the impact it has on concurrent operations. Specifically, we:
// 1 - Do an initial iteration to collect matches. This allows us to do the
// "expensive" prefix check (on all values) using only a read-lock
// 2 - Do a second iteration, under write lock, for the matched results to do
// the actual deletion
// Also, this is the only place where the Bucket is aware of cache detail: the
// deletables channel. Passing it here lets us avoid iterating over matched items
// again in the cache. Further, we pass item to deletables BEFORE actually removing
// the item from the map. I'm pretty sure this is 100% fine, but it is unique.
// (We do this so that the write to the channel is under the read lock and not the
// write lock)
func (b *bucket) deletePrefix(prefix string, deletables chan *Item) int {
lookup := b.lookup
items := make([]*Item, 0, len(lookup)/10)
b.RLock()
for key, item := range lookup {
if strings.HasPrefix(key, prefix) {
deletables <- item
items = append(items, item)
}
}
b.RUnlock()
if len(items) == 0 {
// avoid the write lock if we can
return 0
}
b.Lock()
for _, item := range items {
delete(lookup, item.key)
}
b.Unlock()
return len(items)
}
func (b *bucket) clear() {
b.Lock()
defer b.Unlock()
b.lookup = make(map[string]*Item)
b.Unlock()
}

View File

@@ -9,7 +9,7 @@ import (
type BucketTests struct {
}
func Tests_Bucket(t *testing.T) {
func Test_Bucket(t *testing.T) {
Expectify(new(BucketTests), t)
}
@@ -32,37 +32,20 @@ func (_ *BucketTests) DeleteItemFromBucket() {
func (_ *BucketTests) SetsANewBucketItem() {
bucket := testBucket()
item, new, d := bucket.set("spice", TestValue("flow"), time.Minute)
item, existing := bucket.set("spice", TestValue("flow"), time.Minute)
assertValue(item, "flow")
item = bucket.get("spice")
assertValue(item, "flow")
Expect(new).To.Equal(true)
Expect(d).To.Equal(1)
Expect(existing).To.Equal(nil)
}
func (_ *BucketTests) SetsAnExistingItem() {
bucket := testBucket()
item, new, d := bucket.set("power", TestValue("9002"), time.Minute)
assertValue(item, "9002")
item, existing := bucket.set("power", TestValue("9001"), time.Minute)
assertValue(item, "9001")
item = bucket.get("power")
assertValue(item, "9002")
Expect(new).To.Equal(false)
Expect(d).To.Equal(0)
}
func (_ *BucketTests) ReplaceDoesNothingIfKeyDoesNotExist() {
bucket := testBucket()
Expect(bucket.replace("power", TestValue("9002"))).To.Equal(false)
Expect(bucket.get("power")).To.Equal(nil)
}
func (_ *BucketTests) ReplaceReplacesThevalue() {
bucket := testBucket()
item, _, _ := bucket.set("power", TestValue("9002"), time.Minute)
Expect(bucket.replace("power", TestValue("9004"))).To.Equal(true)
Expect(item.Value().(string)).To.Equal("9004")
Expect(bucket.get("power").Value().(string)).To.Equal("9004")
//not sure how to test that the TTL hasn't changed sort of a sleep..
assertValue(item, "9001")
assertValue(existing, "9000")
}
func testBucket() *bucket {

183
cache.go
View File

@@ -10,12 +10,15 @@ import (
type Cache struct {
*Configuration
list *list.List
size int64
buckets []*bucket
bucketMask uint32
deletables chan *Item
promotables chan *Item
list *list.List
size int64
buckets []*bucket
bucketMask uint32
deletables chan *Item
promotables chan *Item
donec chan struct{}
getDroppedReq chan struct{}
getDroppedRes chan int
}
// Create a new cache with the specified configuration
@@ -26,30 +29,45 @@ func New(config *Configuration) *Cache {
Configuration: config,
bucketMask: uint32(config.buckets) - 1,
buckets: make([]*bucket, config.buckets),
deletables: make(chan *Item, config.deleteBuffer),
promotables: make(chan *Item, config.promoteBuffer),
getDroppedReq: make(chan struct{}),
getDroppedRes: make(chan int),
}
for i := 0; i < int(config.buckets); i++ {
c.buckets[i] = &bucket{
lookup: make(map[string]*Item),
}
}
go c.worker()
c.restart()
return c
}
func (c *Cache) ItemCount() int {
count := 0
for _, b := range c.buckets {
count += b.itemCount()
}
return count
}
func (c *Cache) DeletePrefix(prefix string) int {
count := 0
for _, b := range c.buckets {
count += b.deletePrefix(prefix, c.deletables)
}
return count
}
// Get an item from the cache. Returns nil if the item wasn't found.
// This can return an expired item. Use item.Expired() to see if the item
// is expired and item.TTL() to see how long until the item expires (which
// will be negative for an already expired item).
func (c *Cache) Get(key string) *Item {
bucket := c.bucket(key)
item := bucket.get(key)
item := c.bucket(key).get(key)
if item == nil {
return nil
}
if item.expires > time.Now().Unix() {
c.conditionalPromote(item)
if item.expires > time.Now().UnixNano() {
c.promote(item)
}
return item
}
@@ -67,41 +85,34 @@ func (c *Cache) TrackingGet(key string) TrackedItem {
// Set the value in the cache for the specified duration
func (c *Cache) Set(key string, value interface{}, duration time.Duration) {
item, new, d := c.bucket(key).set(key, value, duration)
if new {
c.promote(item)
} else {
c.conditionalPromote(item)
}
if d != 0 {
atomic.AddInt64(&c.size, d)
}
c.set(key, value, duration)
}
// Replace the value if it exists, does not set if it doesn't.
// Returns true if the item existed an was replaced, false otherwise.
// Replace does not reset item's TTL nor does it alter its position in the LRU
// Replace does not reset item's TTL
func (c *Cache) Replace(key string, value interface{}) bool {
exists, d := c.bucket(key).replace(key, value)
if d != 0 {
atomic.AddInt64(&c.size, d)
item := c.bucket(key).get(key)
if item == nil {
return false
}
return exists
c.Set(key, value, item.TTL())
return true
}
// Attempts to get the value from the cache and calles fetch on a miss.
// If fetch returns an error, no value is cached and the error is returned back
// to the caller.
func (c *Cache) Fetch(key string, duration time.Duration, fetch func() (interface{}, error)) (interface{}, error) {
// Attempts to get the value from the cache and calles fetch on a miss (missing
// or stale item). If fetch returns an error, no value is cached and the error
// is returned back to the caller.
func (c *Cache) Fetch(key string, duration time.Duration, fetch func() (interface{}, error)) (*Item, error) {
item := c.Get(key)
if item != nil {
if item != nil && !item.Expired() {
return item, nil
}
value, err := fetch()
if err == nil {
c.Set(key, value, duration)
if err != nil {
return nil, err
}
return value, err
return c.set(key, value, duration), nil
}
// Remove the item from the cache, return true if the item was present, false otherwise.
@@ -123,44 +134,93 @@ func (c *Cache) Clear() {
c.list = list.New()
}
// Stops the background worker. Operations performed on the cache after Stop
// is called are likely to panic
func (c *Cache) Stop() {
close(c.promotables)
<-c.donec
}
// Gets the number of items removed from the cache due to memory pressure since
// the last time GetDropped was called
func (c *Cache) GetDropped() int {
c.getDroppedReq <- struct{}{}
return <-c.getDroppedRes
}
func (c *Cache) restart() {
c.deletables = make(chan *Item, c.deleteBuffer)
c.promotables = make(chan *Item, c.promoteBuffer)
c.donec = make(chan struct{})
go c.worker()
}
func (c *Cache) deleteItem(bucket *bucket, item *Item) {
bucket.delete(item.key) //stop other GETs from getting it
c.deletables <- item
}
func (c *Cache) set(key string, value interface{}, duration time.Duration) *Item {
item, existing := c.bucket(key).set(key, value, duration)
if existing != nil {
c.deletables <- existing
}
c.promote(item)
return item
}
func (c *Cache) bucket(key string) *bucket {
h := fnv.New32a()
h.Write([]byte(key))
return c.buckets[h.Sum32()&c.bucketMask]
}
func (c *Cache) conditionalPromote(item *Item) {
if item.shouldPromote(c.getsPerPromote) == false {
return
}
c.promote(item)
}
func (c *Cache) promote(item *Item) {
c.promotables <- item
}
func (c *Cache) worker() {
defer close(c.donec)
dropped := 0
for {
select {
case item := <-c.promotables:
if c.doPromote(item) && atomic.LoadInt64(&c.size) > c.maxSize {
c.gc()
case item, ok := <-c.promotables:
if ok == false {
goto drain
}
if c.doPromote(item) && c.size > c.maxSize {
dropped += c.gc()
}
case item := <-c.deletables:
atomic.AddInt64(&c.size, -item.size)
if item.element == nil {
item.promotions = -2
} else {
c.list.Remove(item.element)
}
c.doDelete(item)
case _ = <-c.getDroppedReq:
c.getDroppedRes <- dropped
dropped = 0
}
}
drain:
for {
select {
case item := <-c.deletables:
c.doDelete(item)
default:
close(c.deletables)
return
}
}
}
func (c *Cache) doDelete(item *Item) {
if item.element == nil {
item.promotions = -2
} else {
c.size -= item.size
if c.onDelete != nil {
c.onDelete(item)
}
c.list.Remove(item.element)
}
}
func (c *Cache) doPromote(item *Item) bool {
@@ -168,28 +228,39 @@ func (c *Cache) doPromote(item *Item) bool {
if item.promotions == -2 {
return false
}
item.promotions = 0
if item.element != nil { //not a new item
c.list.MoveToFront(item.element)
if item.shouldPromote(c.getsPerPromote) {
c.list.MoveToFront(item.element)
item.promotions = 0
}
return false
}
c.size += item.size
item.element = c.list.PushFront(item)
return true
}
func (c *Cache) gc() {
func (c *Cache) gc() int {
dropped := 0
element := c.list.Back()
for i := 0; i < c.itemsToPrune; i++ {
if element == nil {
return
return dropped
}
prev := element.Prev()
item := element.Value.(*Item)
if c.tracking == false || atomic.LoadInt32(&item.refCount) == 0 {
c.bucket(item.key).delete(item.key)
atomic.AddInt64(&c.size, -item.size)
c.size -= item.size
c.list.Remove(element)
if c.onDelete != nil {
c.onDelete(item)
}
dropped += 1
item.promotions = -2
}
element = prev
}
return dropped
}

View File

@@ -1,10 +1,11 @@
package ccache
import (
. "github.com/karlseguin/expect"
"strconv"
"testing"
"time"
. "github.com/karlseguin/expect"
)
type CacheTests struct{}
@@ -15,11 +16,71 @@ func Test_Cache(t *testing.T) {
func (_ CacheTests) DeletesAValue() {
cache := New(Configure())
Expect(cache.ItemCount()).To.Equal(0)
cache.Set("spice", "flow", time.Minute)
cache.Set("worm", "sand", time.Minute)
Expect(cache.ItemCount()).To.Equal(2)
cache.Delete("spice")
Expect(cache.Get("spice")).To.Equal(nil)
Expect(cache.Get("worm").Value()).To.Equal("sand")
Expect(cache.ItemCount()).To.Equal(1)
}
func (_ CacheTests) DeletesAPrefix() {
cache := New(Configure())
Expect(cache.ItemCount()).To.Equal(0)
cache.Set("aaa", "1", time.Minute)
cache.Set("aab", "2", time.Minute)
cache.Set("aac", "3", time.Minute)
cache.Set("ac", "4", time.Minute)
cache.Set("z5", "7", time.Minute)
Expect(cache.ItemCount()).To.Equal(5)
Expect(cache.DeletePrefix("9a")).To.Equal(0)
Expect(cache.ItemCount()).To.Equal(5)
Expect(cache.DeletePrefix("aa")).To.Equal(3)
Expect(cache.Get("aaa")).To.Equal(nil)
Expect(cache.Get("aab")).To.Equal(nil)
Expect(cache.Get("aac")).To.Equal(nil)
Expect(cache.Get("ac").Value()).To.Equal("4")
Expect(cache.Get("z5").Value()).To.Equal("7")
Expect(cache.ItemCount()).To.Equal(2)
}
func (_ CacheTests) OnDeleteCallbackCalled() {
onDeleteFnCalled := false
onDeleteFn := func(item *Item) {
if item.key == "spice" {
onDeleteFnCalled = true
}
}
cache := New(Configure().OnDelete(onDeleteFn))
cache.Set("spice", "flow", time.Minute)
cache.Set("worm", "sand", time.Minute)
time.Sleep(time.Millisecond * 10) // Run once to init
cache.Delete("spice")
time.Sleep(time.Millisecond * 10) // Wait for worker to pick up deleted items
Expect(cache.Get("spice")).To.Equal(nil)
Expect(cache.Get("worm").Value()).To.Equal("sand")
Expect(onDeleteFnCalled).To.Equal(true)
}
func (_ CacheTests) FetchesExpiredItems() {
cache := New(Configure())
fn := func() (interface{}, error) { return "moo-moo", nil }
cache.Set("beef", "moo", time.Second*-1)
Expect(cache.Get("beef").Value()).To.Equal("moo")
out, _ := cache.Fetch("beef", time.Second, fn)
Expect(out.Value()).To.Equal("moo-moo")
}
func (_ CacheTests) GCsTheOldestItems() {
@@ -29,9 +90,10 @@ func (_ CacheTests) GCsTheOldestItems() {
}
//let the items get promoted (and added to our list)
time.Sleep(time.Millisecond * 10)
cache.gc()
gcCache(cache)
Expect(cache.Get("9")).To.Equal(nil)
Expect(cache.Get("10").Value()).To.Equal(10)
Expect(cache.ItemCount()).To.Equal(490)
}
func (_ CacheTests) PromotedItemsDontGetPruned() {
@@ -42,7 +104,7 @@ func (_ CacheTests) PromotedItemsDontGetPruned() {
time.Sleep(time.Millisecond * 10) //run the worker once to init the list
cache.Get("9")
time.Sleep(time.Millisecond * 10)
cache.gc()
gcCache(cache)
Expect(cache.Get("9").Value()).To.Equal(9)
Expect(cache.Get("10")).To.Equal(nil)
Expect(cache.Get("11").Value()).To.Equal(11)
@@ -55,16 +117,23 @@ func (_ CacheTests) TrackerDoesNotCleanupHeldInstance() {
}
item := cache.TrackingGet("0")
time.Sleep(time.Millisecond * 10)
cache.gc()
gcCache(cache)
Expect(cache.Get("0").Value()).To.Equal(0)
Expect(cache.Get("1")).To.Equal(nil)
item.Release()
cache.gc()
gcCache(cache)
Expect(cache.Get("0")).To.Equal(nil)
}
func (_ CacheTests) RemovesOldestItemWhenFull() {
cache := New(Configure().MaxSize(5).ItemsToPrune(1))
onDeleteFnCalled := false
onDeleteFn := func(item *Item) {
if item.key == "0" {
onDeleteFnCalled = true
}
}
cache := New(Configure().MaxSize(5).ItemsToPrune(1).OnDelete(onDeleteFn))
for i := 0; i < 7; i++ {
cache.Set(strconv.Itoa(i), i, time.Minute)
}
@@ -72,6 +141,8 @@ func (_ CacheTests) RemovesOldestItemWhenFull() {
Expect(cache.Get("0")).To.Equal(nil)
Expect(cache.Get("1")).To.Equal(nil)
Expect(cache.Get("2").Value()).To.Equal(2)
Expect(onDeleteFnCalled).To.Equal(true)
Expect(cache.ItemCount()).To.Equal(5)
}
func (_ CacheTests) RemovesOldestItemWhenFullBySizer() {
@@ -83,23 +154,30 @@ func (_ CacheTests) RemovesOldestItemWhenFullBySizer() {
Expect(cache.Get("0")).To.Equal(nil)
Expect(cache.Get("1")).To.Equal(nil)
Expect(cache.Get("2")).To.Equal(nil)
Expect(cache.Get("3").Value().(*SizedItem).id).To.Equal(3)
Expect(cache.Get("3")).To.Equal(nil)
Expect(cache.Get("4").Value().(*SizedItem).id).To.Equal(4)
Expect(cache.GetDropped()).To.Equal(4)
Expect(cache.GetDropped()).To.Equal(0)
}
func (_ CacheTests) SetUpdatesSizeOnDelta() {
cache := New(Configure())
cache.Set("a", &SizedItem{0, 2}, time.Minute)
cache.Set("b", &SizedItem{0, 3}, time.Minute)
Expect(cache.size).To.Equal(int64(5))
time.Sleep(time.Millisecond * 5)
checkSize(cache, 5)
cache.Set("b", &SizedItem{0, 3}, time.Minute)
Expect(cache.size).To.Equal(int64(5))
time.Sleep(time.Millisecond * 5)
checkSize(cache, 5)
cache.Set("b", &SizedItem{0, 4}, time.Minute)
Expect(cache.size).To.Equal(int64(6))
time.Sleep(time.Millisecond * 5)
checkSize(cache, 6)
cache.Set("b", &SizedItem{0, 2}, time.Minute)
Expect(cache.size).To.Equal(int64(4))
time.Sleep(time.Millisecond * 5)
checkSize(cache, 4)
cache.Delete("b")
time.Sleep(time.Millisecond * 10)
Expect(cache.size).To.Equal(int64(2))
time.Sleep(time.Millisecond * 100)
checkSize(cache, 2)
}
func (_ CacheTests) ReplaceDoesNotchangeSizeIfNotSet() {
@@ -108,7 +186,8 @@ func (_ CacheTests) ReplaceDoesNotchangeSizeIfNotSet() {
cache.Set("2", &SizedItem{1, 2}, time.Minute)
cache.Set("3", &SizedItem{1, 2}, time.Minute)
cache.Replace("4", &SizedItem{1, 2})
Expect(cache.size).To.Equal(int64(6))
time.Sleep(time.Millisecond * 5)
checkSize(cache, 6)
}
func (_ CacheTests) ReplaceChangesSize() {
@@ -117,13 +196,16 @@ func (_ CacheTests) ReplaceChangesSize() {
cache.Set("2", &SizedItem{1, 2}, time.Minute)
cache.Replace("2", &SizedItem{1, 2})
Expect(cache.size).To.Equal(int64(4))
time.Sleep(time.Millisecond * 5)
checkSize(cache, 4)
cache.Replace("2", &SizedItem{1, 1})
Expect(cache.size).To.Equal(int64(3))
time.Sleep(time.Millisecond * 5)
checkSize(cache, 3)
cache.Replace("2", &SizedItem{1, 3})
Expect(cache.size).To.Equal(int64(5))
time.Sleep(time.Millisecond * 5)
checkSize(cache, 5)
}
type SizedItem struct {
@@ -134,3 +216,15 @@ type SizedItem struct {
func (s *SizedItem) Size() int64 {
return s.s
}
func checkSize(cache *Cache, sz int64) {
cache.Stop()
Expect(cache.size).To.Equal(sz)
cache.restart()
}
func gcCache(cache *Cache) {
cache.Stop()
cache.gc()
cache.restart()
}

View File

@@ -8,6 +8,7 @@ type Configuration struct {
promoteBuffer int
getsPerPromote int32
tracking bool
onDelete func(item *Item)
}
// Creates a configuration object with sensible defaults
@@ -65,7 +66,7 @@ func (c *Configuration) DeleteBuffer(size uint32) *Configuration {
return c
}
// Give a large cache with a high read / write ratio, it's usually unecessary
// Give a large cache with a high read / write ratio, it's usually unnecessary
// to promote an item on every Get. GetsPerPromote specifies the number of Gets
// a key must have before being promoted
// [3]
@@ -92,3 +93,11 @@ func (c *Configuration) Track() *Configuration {
c.tracking = true
return c
}
// OnDelete allows setting a callback function to react to ideam deletion.
// This typically allows to do a cleanup of resources, such as calling a Close() on
// cached object that require some kind of tear-down.
func (c *Configuration) OnDelete(callback func(item *Item)) *Configuration {
c.onDelete = callback
return c
}

8
go.mod Normal file
View File

@@ -0,0 +1,8 @@
module github.com/karlseguin/ccache
go 1.13
require (
github.com/karlseguin/expect v1.0.2-0.20190806010014-778a5f0c6003
github.com/wsxiaoys/terminal v0.0.0-20160513160801-0940f3fc43a0
)

6
go.sum Normal file
View File

@@ -0,0 +1,6 @@
github.com/karlseguin/expect v1.0.1 h1:z4wy4npwwHSWKjGWH85WNJO42VQhovxTCZDSzhjo8hY=
github.com/karlseguin/expect v1.0.1/go.mod h1:zNBxMY8P21owkeogJELCLeHIt+voOSduHYTFUbwRAV8=
github.com/karlseguin/expect v1.0.2-0.20190806010014-778a5f0c6003 h1:vJ0Snvo+SLMY72r5J4sEfkuE7AFbixEP2qRbEcum/wA=
github.com/karlseguin/expect v1.0.2-0.20190806010014-778a5f0c6003/go.mod h1:zNBxMY8P21owkeogJELCLeHIt+voOSduHYTFUbwRAV8=
github.com/wsxiaoys/terminal v0.0.0-20160513160801-0940f3fc43a0 h1:3UeQBvD0TFrlVjOeLOBz+CPAI8dnbqNSVwUwRrkp7vQ=
github.com/wsxiaoys/terminal v0.0.0-20160513160801-0940f3fc43a0/go.mod h1:IXCdmsXIht47RaVFLEdVnh1t+pgYtTAhQGj73kz+2DM=

13
item.go
View File

@@ -60,14 +60,15 @@ func newItem(key string, value interface{}, expires int64) *Item {
return &Item{
key: key,
value: value,
promotions: -1,
promotions: 0,
size: size,
expires: expires,
}
}
func (i *Item) shouldPromote(getsPerPromote int32) bool {
return atomic.AddInt32(&i.promotions, 1) == getsPerPromote
i.promotions += 1
return i.promotions == getsPerPromote
}
func (i *Item) Value() interface{} {
@@ -84,19 +85,19 @@ func (i *Item) Release() {
func (i *Item) Expired() bool {
expires := atomic.LoadInt64(&i.expires)
return expires < time.Now().Unix()
return expires < time.Now().UnixNano()
}
func (i *Item) TTL() time.Duration {
expires := atomic.LoadInt64(&i.expires)
return time.Second * time.Duration(expires-time.Now().Unix())
return time.Nanosecond * time.Duration(expires-time.Now().UnixNano())
}
func (i *Item) Expires() time.Time {
expires := atomic.LoadInt64(&i.expires)
return time.Unix(expires, 0)
return time.Unix(0, expires)
}
func (i *Item) Extend(duration time.Duration) {
atomic.StoreInt64(&i.expires, time.Now().Add(duration).Unix())
atomic.StoreInt64(&i.expires, time.Now().Add(duration).UnixNano())
}

View File

@@ -1,9 +1,11 @@
package ccache
import (
. "github.com/karlseguin/expect"
"math"
"testing"
"time"
. "github.com/karlseguin/expect"
)
type ItemTests struct{}
@@ -19,29 +21,29 @@ func (_ *ItemTests) Promotability() {
}
func (_ *ItemTests) Expired() {
now := time.Now().Unix()
item1 := &Item{expires: now + 1}
item2 := &Item{expires: now - 1}
now := time.Now().UnixNano()
item1 := &Item{expires: now + (10 * int64(time.Millisecond))}
item2 := &Item{expires: now - (10 * int64(time.Millisecond))}
Expect(item1.Expired()).To.Equal(false)
Expect(item2.Expired()).To.Equal(true)
}
func (_ *ItemTests) TTL() {
now := time.Now().Unix()
item1 := &Item{expires: now + 10}
item2 := &Item{expires: now - 10}
Expect(item1.TTL()).To.Equal(time.Second * 10)
Expect(item2.TTL()).To.Equal(time.Second * -10)
now := time.Now().UnixNano()
item1 := &Item{expires: now + int64(time.Second)}
item2 := &Item{expires: now - int64(time.Second)}
Expect(int(math.Ceil(item1.TTL().Seconds()))).To.Equal(1)
Expect(int(math.Ceil(item2.TTL().Seconds()))).To.Equal(-1)
}
func (_ *ItemTests) Expires() {
now := time.Now().Unix()
item := &Item{expires: now + 10}
Expect(item.Expires().Unix()).To.Equal(now + 10)
now := time.Now().UnixNano()
item := &Item{expires: now + (10)}
Expect(item.Expires().UnixNano()).To.Equal(now + 10)
}
func (_ *ItemTests) Extend() {
item := &Item{expires: time.Now().Unix() + 10}
item := &Item{expires: time.Now().UnixNano() + 10}
item.Extend(time.Minute * 2)
Expect(item.Expires().Unix()).To.Equal(time.Now().Unix() + 120)
}

View File

@@ -10,17 +10,35 @@ type layeredBucket struct {
buckets map[string]*bucket
}
func (b *layeredBucket) itemCount() int {
count := 0
b.RLock()
defer b.RUnlock()
for _, b := range b.buckets {
count += b.itemCount()
}
return count
}
func (b *layeredBucket) get(primary, secondary string) *Item {
bucket := b.getSecondaryBucket(primary)
if bucket == nil {
return nil
}
return bucket.get(secondary)
}
func (b *layeredBucket) getSecondaryBucket(primary string) *bucket {
b.RLock()
bucket, exists := b.buckets[primary]
b.RUnlock()
if exists == false {
return nil
}
return bucket.get(secondary)
return bucket
}
func (b *layeredBucket) set(primary, secondary string, value interface{}, duration time.Duration) (*Item, bool, int64) {
func (b *layeredBucket) set(primary, secondary string, value interface{}, duration time.Duration) (*Item, *Item) {
b.Lock()
bkt, exists := b.buckets[primary]
if exists == false {
@@ -28,21 +46,9 @@ func (b *layeredBucket) set(primary, secondary string, value interface{}, durati
b.buckets[primary] = bkt
}
b.Unlock()
item, new, d := bkt.set(secondary, value, duration)
if new {
item.group = primary
}
return item, new, d
}
func (b *layeredBucket) replace(primary, secondary string, value interface{}) (bool, int64) {
b.Lock()
bucket, exists := b.buckets[primary]
b.Unlock()
if exists == false {
return false, 0
}
return bucket.replace(secondary, value)
item, existing := bkt.set(secondary, value, duration)
item.group = primary
return item, existing
}
func (b *layeredBucket) delete(primary, secondary string) *Item {

View File

@@ -10,12 +10,15 @@ import (
type LayeredCache struct {
*Configuration
list *list.List
buckets []*layeredBucket
bucketMask uint32
size int64
deletables chan *Item
promotables chan *Item
list *list.List
buckets []*layeredBucket
bucketMask uint32
size int64
deletables chan *Item
promotables chan *Item
donec chan struct{}
getDroppedReq chan struct{}
getDroppedRes chan int
}
// Create a new layered cache with the specified configuration.
@@ -38,33 +41,59 @@ func Layered(config *Configuration) *LayeredCache {
bucketMask: uint32(config.buckets) - 1,
buckets: make([]*layeredBucket, config.buckets),
deletables: make(chan *Item, config.deleteBuffer),
promotables: make(chan *Item, config.promoteBuffer),
getDroppedReq: make(chan struct{}),
getDroppedRes: make(chan int),
}
for i := 0; i < int(config.buckets); i++ {
c.buckets[i] = &layeredBucket{
buckets: make(map[string]*bucket),
}
}
go c.worker()
c.restart()
return c
}
func (c *LayeredCache) ItemCount() int {
count := 0
for _, b := range c.buckets {
count += b.itemCount()
}
return count
}
// Get an item from the cache. Returns nil if the item wasn't found.
// This can return an expired item. Use item.Expired() to see if the item
// is expired and item.TTL() to see how long until the item expires (which
// will be negative for an already expired item).
func (c *LayeredCache) Get(primary, secondary string) *Item {
bucket := c.bucket(primary)
item := bucket.get(primary, secondary)
item := c.bucket(primary).get(primary, secondary)
if item == nil {
return nil
}
if item.expires > time.Now().Unix() {
c.conditionalPromote(item)
if item.expires > time.Now().UnixNano() {
c.promote(item)
}
return item
}
// Get the secondary cache for a given primary key. This operation will
// never return nil. In the case where the primary key does not exist, a
// new, underlying, empty bucket will be created and returned.
func (c *LayeredCache) GetOrCreateSecondaryCache(primary string) *SecondaryCache {
primaryBkt := c.bucket(primary)
bkt := primaryBkt.getSecondaryBucket(primary)
primaryBkt.Lock()
if bkt == nil {
bkt = &bucket{lookup: make(map[string]*Item)}
primaryBkt.buckets[primary] = bkt
}
primaryBkt.Unlock()
return &SecondaryCache{
bucket: bkt,
pCache: c,
}
}
// Used when the cache was created with the Track() configuration option.
// Avoid otherwise
func (c *LayeredCache) TrackingGet(primary, secondary string) TrackedItem {
@@ -78,41 +107,34 @@ func (c *LayeredCache) TrackingGet(primary, secondary string) TrackedItem {
// Set the value in the cache for the specified duration
func (c *LayeredCache) Set(primary, secondary string, value interface{}, duration time.Duration) {
item, new, d := c.bucket(primary).set(primary, secondary, value, duration)
if new {
c.promote(item)
} else {
c.conditionalPromote(item)
}
if d != 0 {
atomic.AddInt64(&c.size, d)
}
c.set(primary, secondary, value, duration)
}
// Replace the value if it exists, does not set if it doesn't.
// Returns true if the item existed an was replaced, false otherwise.
// Replace does not reset item's TTL nor does it alter its position in the LRU
func (c *LayeredCache) Replace(primary, secondary string, value interface{}) bool {
exists, d := c.bucket(primary).replace(primary, secondary, value)
if d != 0 {
atomic.AddInt64(&c.size, d)
item := c.bucket(primary).get(primary, secondary)
if item == nil {
return false
}
return exists
c.Set(primary, secondary, value, item.TTL())
return true
}
// Attempts to get the value from the cache and calles fetch on a miss.
// If fetch returns an error, no value is cached and the error is returned back
// to the caller.
func (c *LayeredCache) Fetch(primary, secondary string, duration time.Duration, fetch func() (interface{}, error)) (interface{}, error) {
func (c *LayeredCache) Fetch(primary, secondary string, duration time.Duration, fetch func() (interface{}, error)) (*Item, error) {
item := c.Get(primary, secondary)
if item != nil {
return item, nil
}
value, err := fetch()
if err == nil {
c.Set(primary, secondary, value, duration)
if err != nil {
return nil, err
}
return value, err
return c.set(primary, secondary, value, duration), nil
}
// Remove the item from the cache, return true if the item was present, false otherwise.
@@ -139,69 +161,106 @@ func (c *LayeredCache) Clear() {
c.list = list.New()
}
func (c *LayeredCache) Stop() {
close(c.promotables)
<-c.donec
}
// Gets the number of items removed from the cache due to memory pressure since
// the last time GetDropped was called
func (c *LayeredCache) GetDropped() int {
c.getDroppedReq <- struct{}{}
return <-c.getDroppedRes
}
func (c *LayeredCache) restart() {
c.promotables = make(chan *Item, c.promoteBuffer)
c.donec = make(chan struct{})
go c.worker()
}
func (c *LayeredCache) set(primary, secondary string, value interface{}, duration time.Duration) *Item {
item, existing := c.bucket(primary).set(primary, secondary, value, duration)
if existing != nil {
c.deletables <- existing
}
c.promote(item)
return item
}
func (c *LayeredCache) bucket(key string) *layeredBucket {
h := fnv.New32a()
h.Write([]byte(key))
return c.buckets[h.Sum32()&c.bucketMask]
}
func (c *LayeredCache) conditionalPromote(item *Item) {
if item.shouldPromote(c.getsPerPromote) == false {
return
}
c.promote(item)
}
func (c *LayeredCache) promote(item *Item) {
c.promotables <- item
}
func (c *LayeredCache) worker() {
defer close(c.donec)
dropped := 0
for {
select {
case item := <-c.promotables:
if c.doPromote(item) && atomic.LoadInt64(&c.size) > c.maxSize {
c.gc()
case item, ok := <-c.promotables:
if ok == false {
return
}
if c.doPromote(item) && c.size > c.maxSize {
dropped += c.gc()
}
case item := <-c.deletables:
atomic.AddInt64(&c.size, -item.size)
if item.element == nil {
item.promotions = -2
atomic.StoreInt32(&item.promotions, -2)
} else {
c.size -= item.size
if c.onDelete != nil {
c.onDelete(item)
}
c.list.Remove(item.element)
}
case _ = <-c.getDroppedReq:
c.getDroppedRes <- dropped
dropped = 0
}
}
}
func (c *LayeredCache) doPromote(item *Item) bool {
// deleted before it ever got promoted
if item.promotions == -2 {
if atomic.LoadInt32(&item.promotions) == -2 {
return false
}
item.promotions = 0
if item.element != nil { //not a new item
c.list.MoveToFront(item.element)
if item.shouldPromote(c.getsPerPromote) {
c.list.MoveToFront(item.element)
atomic.StoreInt32(&item.promotions, 0)
}
return false
}
c.size += item.size
item.element = c.list.PushFront(item)
return true
}
func (c *LayeredCache) gc() {
func (c *LayeredCache) gc() int {
element := c.list.Back()
dropped := 0
for i := 0; i < c.itemsToPrune; i++ {
if element == nil {
return
return dropped
}
prev := element.Prev()
item := element.Value.(*Item)
if c.tracking == false || atomic.LoadInt32(&item.refCount) == 0 {
atomic.AddInt64(&c.size, -item.size)
c.bucket(item.group).delete(item.group, item.key)
c.size -= item.size
c.list.Remove(element)
item.promotions = -2
dropped += 1
}
element = prev
}
return dropped
}

View File

@@ -1,10 +1,11 @@
package ccache
import (
. "github.com/karlseguin/expect"
"strconv"
"testing"
"time"
. "github.com/karlseguin/expect"
)
type LayeredCacheTests struct{}
@@ -16,6 +17,7 @@ func Test_LayeredCache(t *testing.T) {
func (_ *LayeredCacheTests) GetsANonExistantValue() {
cache := newLayered()
Expect(cache.Get("spice", "flow")).To.Equal(nil)
Expect(cache.ItemCount()).To.Equal(0)
}
func (_ *LayeredCacheTests) SetANewValue() {
@@ -23,6 +25,7 @@ func (_ *LayeredCacheTests) SetANewValue() {
cache.Set("spice", "flow", "a value", time.Minute)
Expect(cache.Get("spice", "flow").Value()).To.Equal("a value")
Expect(cache.Get("spice", "stop")).To.Equal(nil)
Expect(cache.ItemCount()).To.Equal(1)
}
func (_ *LayeredCacheTests) SetsMultipleValueWithinTheSameLayer() {
@@ -37,6 +40,7 @@ func (_ *LayeredCacheTests) SetsMultipleValueWithinTheSameLayer() {
Expect(cache.Get("leto", "sister").Value()).To.Equal("ghanima")
Expect(cache.Get("leto", "brother")).To.Equal(nil)
Expect(cache.Get("baron", "friend")).To.Equal(nil)
Expect(cache.ItemCount()).To.Equal(3)
}
func (_ *LayeredCacheTests) ReplaceDoesNothingIfKeyDoesNotExist() {
@@ -50,6 +54,7 @@ func (_ *LayeredCacheTests) ReplaceUpdatesTheValue() {
cache.Set("spice", "flow", "value-a", time.Minute)
Expect(cache.Replace("spice", "flow", "value-b")).To.Equal(true)
Expect(cache.Get("spice", "flow").Value().(string)).To.Equal("value-b")
Expect(cache.ItemCount()).To.Equal(1)
//not sure how to test that the TTL hasn't changed sort of a sleep..
}
@@ -63,6 +68,34 @@ func (_ *LayeredCacheTests) DeletesAValue() {
Expect(cache.Get("spice", "must").Value()).To.Equal("value-b")
Expect(cache.Get("spice", "worm")).To.Equal(nil)
Expect(cache.Get("leto", "sister").Value()).To.Equal("ghanima")
Expect(cache.ItemCount()).To.Equal(2)
}
func (_ *LayeredCacheTests) OnDeleteCallbackCalled() {
onDeleteFnCalled := false
onDeleteFn := func(item *Item) {
if item.group == "spice" && item.key == "flow" {
onDeleteFnCalled = true
}
}
cache := Layered(Configure().OnDelete(onDeleteFn))
cache.Set("spice", "flow", "value-a", time.Minute)
cache.Set("spice", "must", "value-b", time.Minute)
cache.Set("leto", "sister", "ghanima", time.Minute)
time.Sleep(time.Millisecond * 10) // Run once to init
cache.Delete("spice", "flow")
time.Sleep(time.Millisecond * 10) // Wait for worker to pick up deleted items
Expect(cache.Get("spice", "flow")).To.Equal(nil)
Expect(cache.Get("spice", "must").Value()).To.Equal("value-b")
Expect(cache.Get("spice", "worm")).To.Equal(nil)
Expect(cache.Get("leto", "sister").Value()).To.Equal("ghanima")
Expect(onDeleteFnCalled).To.Equal(true)
}
func (_ *LayeredCacheTests) DeletesALayer() {
@@ -86,7 +119,7 @@ func (_ LayeredCacheTests) GCsTheOldestItems() {
cache.Set("xx", "b", 9001, time.Minute)
//let the items get promoted (and added to our list)
time.Sleep(time.Millisecond * 10)
cache.gc()
gcLayeredCache(cache)
Expect(cache.Get("xx", "a")).To.Equal(nil)
Expect(cache.Get("xx", "b").Value()).To.Equal(9001)
Expect(cache.Get("8", "a")).To.Equal(nil)
@@ -102,7 +135,7 @@ func (_ LayeredCacheTests) PromotedItemsDontGetPruned() {
time.Sleep(time.Millisecond * 10) //run the worker once to init the list
cache.Get("9", "a")
time.Sleep(time.Millisecond * 10)
cache.gc()
gcLayeredCache(cache)
Expect(cache.Get("9", "a").Value()).To.Equal(9)
Expect(cache.Get("10", "a")).To.Equal(nil)
Expect(cache.Get("11", "a").Value()).To.Equal(11)
@@ -115,11 +148,11 @@ func (_ LayeredCacheTests) TrackerDoesNotCleanupHeldInstance() {
}
item := cache.TrackingGet("0", "a")
time.Sleep(time.Millisecond * 10)
cache.gc()
gcLayeredCache(cache)
Expect(cache.Get("0", "a").Value()).To.Equal(0)
Expect(cache.Get("1", "a")).To.Equal(nil)
item.Release()
cache.gc()
gcLayeredCache(cache)
Expect(cache.Get("0", "a")).To.Equal(nil)
}
@@ -137,6 +170,8 @@ func (_ LayeredCacheTests) RemovesOldestItemWhenFull() {
Expect(cache.Get("2", "a")).To.Equal(nil)
Expect(cache.Get("3", "a").Value()).To.Equal(3)
Expect(cache.Get("xx", "b").Value()).To.Equal(9001)
Expect(cache.GetDropped()).To.Equal(4)
Expect(cache.GetDropped()).To.Equal(0)
}
func newLayered() *LayeredCache {
@@ -152,24 +187,29 @@ func (_ LayeredCacheTests) RemovesOldestItemWhenFullBySizer() {
Expect(cache.Get("pri", "0")).To.Equal(nil)
Expect(cache.Get("pri", "1")).To.Equal(nil)
Expect(cache.Get("pri", "2")).To.Equal(nil)
Expect(cache.Get("pri", "3").Value().(*SizedItem).id).To.Equal(3)
Expect(cache.Get("pri", "3")).To.Equal(nil)
Expect(cache.Get("pri", "4").Value().(*SizedItem).id).To.Equal(4)
}
func (_ LayeredCacheTests) SetUpdatesSizeOnDelta() {
cache := Layered(Configure())
cache.Set("pri", "a", &SizedItem{0, 2}, time.Minute)
cache.Set("pri", "b", &SizedItem{0, 3}, time.Minute)
Expect(cache.size).To.Equal(int64(5))
time.Sleep(time.Millisecond * 5)
checkLayeredSize(cache, 5)
cache.Set("pri", "b", &SizedItem{0, 3}, time.Minute)
Expect(cache.size).To.Equal(int64(5))
time.Sleep(time.Millisecond * 5)
checkLayeredSize(cache, 5)
cache.Set("pri", "b", &SizedItem{0, 4}, time.Minute)
Expect(cache.size).To.Equal(int64(6))
time.Sleep(time.Millisecond * 5)
checkLayeredSize(cache, 6)
cache.Set("pri", "b", &SizedItem{0, 2}, time.Minute)
cache.Set("sec", "b", &SizedItem{0, 3}, time.Minute)
Expect(cache.size).To.Equal(int64(7))
time.Sleep(time.Millisecond * 5)
checkLayeredSize(cache, 7)
cache.Delete("pri", "b")
time.Sleep(time.Millisecond * 10)
Expect(cache.size).To.Equal(int64(5))
checkLayeredSize(cache, 5)
}
func (_ LayeredCacheTests) ReplaceDoesNotchangeSizeIfNotSet() {
@@ -178,7 +218,8 @@ func (_ LayeredCacheTests) ReplaceDoesNotchangeSizeIfNotSet() {
cache.Set("pri", "2", &SizedItem{1, 2}, time.Minute)
cache.Set("pri", "3", &SizedItem{1, 2}, time.Minute)
cache.Replace("sec", "3", &SizedItem{1, 2})
Expect(cache.size).To.Equal(int64(6))
time.Sleep(time.Millisecond * 5)
checkLayeredSize(cache, 6)
}
func (_ LayeredCacheTests) ReplaceChangesSize() {
@@ -187,11 +228,26 @@ func (_ LayeredCacheTests) ReplaceChangesSize() {
cache.Set("pri", "2", &SizedItem{1, 2}, time.Minute)
cache.Replace("pri", "2", &SizedItem{1, 2})
Expect(cache.size).To.Equal(int64(4))
time.Sleep(time.Millisecond * 5)
checkLayeredSize(cache, 4)
cache.Replace("pri", "2", &SizedItem{1, 1})
Expect(cache.size).To.Equal(int64(3))
time.Sleep(time.Millisecond * 5)
checkLayeredSize(cache, 3)
cache.Replace("pri", "2", &SizedItem{1, 3})
Expect(cache.size).To.Equal(int64(5))
time.Sleep(time.Millisecond * 5)
checkLayeredSize(cache, 5)
}
func checkLayeredSize(cache *LayeredCache, sz int64) {
cache.Stop()
Expect(cache.size).To.Equal(sz)
cache.restart()
}
func gcLayeredCache(cache *LayeredCache) {
cache.Stop()
cache.gc()
cache.restart()
}

View File

@@ -7,11 +7,15 @@ Lock contention on the list is reduced by:
* Using a buffered channel to queue promotions for a single worker
* Garbage collecting within the same thread as the worker
Unless otherwise stated, all methods are thread-safe.
## Setup
First, download the project:
```go
go get github.com/karlseguin/ccache
```
## Configuration
Next, import and create a `Cache` instance:
@@ -28,7 +32,7 @@ var cache = ccache.New(ccache.Configure())
`Configure` exposes a chainable API:
```go
var cache = ccache.New(ccache.Configure().MaxSize(1000).itemsToPrune(100))
var cache = ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100))
```
The most likely configuration options to tweak are:
@@ -89,6 +93,12 @@ item, err := cache.Fetch("user:4", time.Minute * 10, func() (interface{}, error)
cache.Delete("user:4")
```
### DeletePrefix
`DeletePrefix` deletes all keys matching the provided prefix. Returns the number of keys removed.
### Clear
`Clear` clears the cache. This method is **not** thread safe. It is meant to be used from tests.
### Extend
The life of an item can be changed via the `Extend` method. This will change the expiry of the item by the specified duration relative to the current time.
@@ -101,6 +111,18 @@ cache.Replace("user:4", user)
`Replace` returns true if the item existed (and thus was replaced). In the case where the key was not in the cache, the value *is not* inserted and false is returned.
### GetDropped
You can get the number of keys evicted due to memory pressure by calling `GetDropped`:
```go
dropped := cache.GetDropped()
```
The counter is reset on every call. If the cache's gc is running, `GetDropped` waits for it to finish; it's meant ot be called asynchronously for statistics /monitoring purposes.
### Stop
The cache's background worker can be stopped by calling `Stop`. Once `Stop` is called
the cache should not be used (calls are likely to panic). Stop must be called in order to allow the garbage collector to reap the cache.
## Tracking
CCache supports a special tracking mode which is meant to be used in conjunction with other pieces of your code that maintains a long-lived reference to data.
@@ -118,11 +140,11 @@ user := item.Value() //will be nil if "user:4" didn't exist in the cache
item.Release() //can be called even if item.Value() returned nil
```
In practive, `Release` wouldn't be called until later, at some other place in your code.
In practice, `Release` wouldn't be called until later, at some other place in your code.
There's a couple reason to use the tracking mode if other parts of your code also hold references to objects. First, if you're already going to hold a reference to these objects, there's really no reason not to have them in the cache - the memory is used up anyways.
More important, it helps ensure that you're code returns consistent data. With tracking, "user:4" might be purged, and a subsequent `Fetch` would reload the data. This can result in different versions of "user:4" being returned by different parts of your system.
More important, it helps ensure that your code returns consistent data. With tracking, "user:4" might be purged, and a subsequent `Fetch` would reload the data. This can result in different versions of "user:4" being returned by different parts of your system.
## LayeredCache
@@ -146,7 +168,23 @@ cache.Delete("/users/goku", "type:xml")
// OR
cache.DeleteAll("/users/goku")
```
# SecondaryCache
In some cases, when using a `LayeredCache`, it may be desirable to always be acting on the secondary portion of the cache entry. This could be the case where the primary key is used as a key elsewhere in your code. The `SecondaryCache` is retrieved with:
```go
cache := ccache.Layered(ccache.Configure())
sCache := cache.GetOrCreateSecondaryCache("/users/goku")
sCache.Set("type:json", "{value_to_cache}", time.Minute * 5)
```
The semantics for interacting with the `SecondaryCache` are exactly the same as for a regular `Cache`. However, one difference is that `Get` will not return nil, but will return an empty 'cache' for a non-existent primary key.
## Size
By default, items added to a cache have a size of 1. This means that if you configure `MaxSize(10000)`, you'll be able to store 10000 items in the cache.
However, if the values you set into the cache have a method `Size() int64`, this size will be used. Note that ccache has an overhead of ~350 bytes per entry, which isn't taken into account. In other words, given a filled up cache, with `MaxSize(4096000)` and items that return a `Size() int64` of 2048, we can expect to find 2000 items (4096000/2048) taking a total space of 4796000 bytes.
## Want Something Simpler?
For a simpler cache, checkout out [rcache](https://github.com/karlseguin/rcache)

72
secondarycache.go Normal file
View File

@@ -0,0 +1,72 @@
package ccache
import "time"
type SecondaryCache struct {
bucket *bucket
pCache *LayeredCache
}
// Get the secondary key.
// The semantics are the same as for LayeredCache.Get
func (s *SecondaryCache) Get(secondary string) *Item {
return s.bucket.get(secondary)
}
// Set the secondary key to a value.
// The semantics are the same as for LayeredCache.Set
func (s *SecondaryCache) Set(secondary string, value interface{}, duration time.Duration) *Item {
item, existing := s.bucket.set(secondary, value, duration)
if existing != nil {
s.pCache.deletables <- existing
}
s.pCache.promote(item)
return item
}
// Fetch or set a secondary key.
// The semantics are the same as for LayeredCache.Fetch
func (s *SecondaryCache) Fetch(secondary string, duration time.Duration, fetch func() (interface{}, error)) (*Item, error) {
item := s.Get(secondary)
if item != nil {
return item, nil
}
value, err := fetch()
if err != nil {
return nil, err
}
return s.Set(secondary, value, duration), nil
}
// Delete a secondary key.
// The semantics are the same as for LayeredCache.Delete
func (s *SecondaryCache) Delete(secondary string) bool {
item := s.bucket.delete(secondary)
if item != nil {
s.pCache.deletables <- item
return true
}
return false
}
// Replace a secondary key.
// The semantics are the same as for LayeredCache.Replace
func (s *SecondaryCache) Replace(secondary string, value interface{}) bool {
item := s.Get(secondary)
if item == nil {
return false
}
s.Set(secondary, value, item.TTL())
return true
}
// Track a secondary key.
// The semantics are the same as for LayeredCache.TrackingGet
func (c *SecondaryCache) TrackingGet(secondary string) TrackedItem {
item := c.Get(secondary)
if item == nil {
return NilTracked
}
item.track()
return item
}

105
secondarycache_test.go Normal file
View File

@@ -0,0 +1,105 @@
package ccache
import (
. "github.com/karlseguin/expect"
"strconv"
"testing"
"time"
)
type SecondaryCacheTests struct{}
func Test_SecondaryCache(t *testing.T) {
Expectify(new(SecondaryCacheTests), t)
}
func (_ SecondaryCacheTests) GetsANonExistantValue() {
cache := newLayered().GetOrCreateSecondaryCache("foo")
Expect(cache).Not.To.Equal(nil)
}
func (_ SecondaryCacheTests) SetANewValue() {
cache := newLayered()
cache.Set("spice", "flow", "a value", time.Minute)
sCache := cache.GetOrCreateSecondaryCache("spice")
Expect(sCache.Get("flow").Value()).To.Equal("a value")
Expect(sCache.Get("stop")).To.Equal(nil)
}
func (_ SecondaryCacheTests) ValueCanBeSeenInBothCaches1() {
cache := newLayered()
cache.Set("spice", "flow", "a value", time.Minute)
sCache := cache.GetOrCreateSecondaryCache("spice")
sCache.Set("orinoco", "another value", time.Minute)
Expect(sCache.Get("orinoco").Value()).To.Equal("another value")
Expect(cache.Get("spice", "orinoco").Value()).To.Equal("another value")
}
func (_ SecondaryCacheTests) ValueCanBeSeenInBothCaches2() {
cache := newLayered()
sCache := cache.GetOrCreateSecondaryCache("spice")
sCache.Set("flow", "a value", time.Minute)
Expect(sCache.Get("flow").Value()).To.Equal("a value")
Expect(cache.Get("spice", "flow").Value()).To.Equal("a value")
}
func (_ SecondaryCacheTests) DeletesAreReflectedInBothCaches() {
cache := newLayered()
cache.Set("spice", "flow", "a value", time.Minute)
cache.Set("spice", "sister", "ghanima", time.Minute)
sCache := cache.GetOrCreateSecondaryCache("spice")
cache.Delete("spice", "flow")
Expect(cache.Get("spice", "flow")).To.Equal(nil)
Expect(sCache.Get("flow")).To.Equal(nil)
sCache.Delete("sister")
Expect(cache.Get("spice", "sister")).To.Equal(nil)
Expect(sCache.Get("sister")).To.Equal(nil)
}
func (_ SecondaryCacheTests) ReplaceDoesNothingIfKeyDoesNotExist() {
cache := newLayered()
sCache := cache.GetOrCreateSecondaryCache("spice")
Expect(sCache.Replace("flow", "value-a")).To.Equal(false)
Expect(cache.Get("spice", "flow")).To.Equal(nil)
}
func (_ SecondaryCacheTests) ReplaceUpdatesTheValue() {
cache := newLayered()
cache.Set("spice", "flow", "value-a", time.Minute)
sCache := cache.GetOrCreateSecondaryCache("spice")
Expect(sCache.Replace("flow", "value-b")).To.Equal(true)
Expect(cache.Get("spice", "flow").Value().(string)).To.Equal("value-b")
}
func (_ SecondaryCacheTests) FetchReturnsAnExistingValue() {
cache := newLayered()
cache.Set("spice", "flow", "value-a", time.Minute)
sCache := cache.GetOrCreateSecondaryCache("spice")
val, _ := sCache.Fetch("flow", time.Minute, func() (interface{}, error) { return "a fetched value", nil })
Expect(val.Value().(string)).To.Equal("value-a")
}
func (_ SecondaryCacheTests) FetchReturnsANewValue() {
cache := newLayered()
sCache := cache.GetOrCreateSecondaryCache("spice")
val, _ := sCache.Fetch("flow", time.Minute, func() (interface{}, error) { return "a fetched value", nil })
Expect(val.Value().(string)).To.Equal("a fetched value")
}
func (_ SecondaryCacheTests) TrackerDoesNotCleanupHeldInstance() {
cache := Layered(Configure().ItemsToPrune(10).Track())
for i := 0; i < 10; i++ {
cache.Set(strconv.Itoa(i), "a", i, time.Minute)
}
sCache := cache.GetOrCreateSecondaryCache("0")
item := sCache.TrackingGet("a")
time.Sleep(time.Millisecond * 10)
gcLayeredCache(cache)
Expect(cache.Get("0", "a").Value()).To.Equal(0)
Expect(cache.Get("1", "a")).To.Equal(nil)
item.Release()
gcLayeredCache(cache)
Expect(cache.Get("0", "a")).To.Equal(nil)
}