diff --git a/bucket.go b/bucket.go index 9d95b44..b93d51e 100644 --- a/bucket.go +++ b/bucket.go @@ -1,53 +1,53 @@ package ccache import ( - "sync" - "time" + "sync" + "time" ) type Bucket struct { - sync.RWMutex - lookup map[string]*Item + sync.RWMutex + lookup map[string]*Item } func (b *Bucket) get(key string) *Item { - b.RLock() - defer b.RUnlock() - return b.lookup[key] + b.RLock() + defer b.RUnlock() + return b.lookup[key] } func (b *Bucket) set(key string, value interface{}, duration time.Duration) (*Item, bool) { - expires := time.Now().Add(duration) - b.Lock() - defer b.Unlock() - if existing, exists := b.lookup[key]; exists { - existing.Lock() - existing.value = value - existing.expires = expires - existing.Unlock() - return existing, false - } - item := newItem(key, value, expires) - b.lookup[key] = item - return item, true + expires := time.Now().Add(duration) + b.Lock() + defer b.Unlock() + if existing, exists := b.lookup[key]; exists { + existing.Lock() + existing.value = value + existing.expires = expires + existing.Unlock() + return existing, false + } + item := newItem(key, value, expires) + b.lookup[key] = item + return item, true } func (b *Bucket) delete(key string) { - b.Lock() - defer b.Unlock() - delete(b.lookup, key) + b.Lock() + defer b.Unlock() + delete(b.lookup, key) } -func (b *Bucket) getAndDelete(key string) *Item{ - b.Lock() - defer b.Unlock() - item := b.lookup[key] - delete(b.lookup, key) - return item +func (b *Bucket) getAndDelete(key string) *Item { + b.Lock() + defer b.Unlock() + item := b.lookup[key] + delete(b.lookup, key) + return item } func (b *Bucket) clear() { - b.Lock() - defer b.Unlock() - b.lookup = make(map[string]*Item) + b.Lock() + defer b.Unlock() + b.lookup = make(map[string]*Item) } diff --git a/bucket_test.go b/bucket_test.go index 6b34360..23d086f 100644 --- a/bucket_test.go +++ b/bucket_test.go @@ -1,64 +1,64 @@ package ccache import ( - "time" - "testing" - "github.com/viki-org/gspec" + "github.com/karlseguin/gspec" + "testing" + "time" ) func TestGetMissFromBucket(t *testing.T) { - bucket := testBucket() - gspec.New(t).Expect(bucket.get("invalid")).ToBeNil() + bucket := testBucket() + gspec.New(t).Expect(bucket.get("invalid")).ToBeNil() } func TestGetHitFromBucket(t *testing.T) { - bucket := testBucket() - item := bucket.get("power") - assertValue(t, item, "9000") + bucket := testBucket() + item := bucket.get("power") + assertValue(t, item, "9000") } func TestDeleteItemFromBucket(t *testing.T) { - bucket := testBucket() - bucket.delete("power") - gspec.New(t).Expect(bucket.get("power")).ToBeNil() + bucket := testBucket() + bucket.delete("power") + gspec.New(t).Expect(bucket.get("power")).ToBeNil() } func TestSetsANewBucketItem(t *testing.T) { - spec := gspec.New(t) - bucket := testBucket() - item, new := bucket.set("spice", TestValue("flow"), time.Minute) - assertValue(t, item, "flow") - item = bucket.get("spice") - assertValue(t, item, "flow") - spec.Expect(new).ToEqual(true) + spec := gspec.New(t) + bucket := testBucket() + item, new := bucket.set("spice", TestValue("flow"), time.Minute) + assertValue(t, item, "flow") + item = bucket.get("spice") + assertValue(t, item, "flow") + spec.Expect(new).ToEqual(true) } func TestSetsAnExistingItem(t *testing.T) { - spec := gspec.New(t) - bucket := testBucket() - item, new := bucket.set("power", TestValue("9002"), time.Minute) - assertValue(t, item, "9002") - item = bucket.get("power") - assertValue(t, item, "9002") - spec.Expect(new).ToEqual(false) + spec := gspec.New(t) + bucket := testBucket() + item, new := bucket.set("power", TestValue("9002"), time.Minute) + assertValue(t, item, "9002") + item = bucket.get("power") + assertValue(t, item, "9002") + spec.Expect(new).ToEqual(false) } func testBucket() *Bucket { - b := &Bucket{lookup: make(map[string]*Item),} - b.lookup["power"] = &Item{ - key: "power", - value: TestValue("9000"), - } - return b + b := &Bucket{lookup: make(map[string]*Item)} + b.lookup["power"] = &Item{ + key: "power", + value: TestValue("9000"), + } + return b } func assertValue(t *testing.T, item *Item, expected string) { - value := item.value.(TestValue) - gspec.New(t).Expect(value).ToEqual(TestValue(expected)) + value := item.value.(TestValue) + gspec.New(t).Expect(value).ToEqual(TestValue(expected)) } type TestValue string func (v TestValue) Expires() time.Time { - return time.Now() + return time.Now() } diff --git a/cache.go b/cache.go index 5e4290d..c447305 100644 --- a/cache.go +++ b/cache.go @@ -1,139 +1,172 @@ +// An LRU cached aimed at high concurrency package ccache import ( - "time" - "runtime" - "hash/fnv" - "container/list" + "container/list" + "hash/fnv" + "runtime" + "sync/atomic" + "time" ) type Cache struct { - *Configuration - list *list.List - buckets []*Bucket - bucketCount uint32 - deletables chan *Item - promotables chan *Item + *Configuration + list *list.List + buckets []*Bucket + bucketCount uint32 + deletables chan *Item + promotables chan *Item } func New(config *Configuration) *Cache { - c := &Cache{ - list: list.New(), - Configuration: config, - bucketCount: uint32(config.buckets), - buckets: make([]*Bucket, config.buckets), - deletables: make(chan *Item, config.deleteBuffer), - promotables: make(chan *Item, config.promoteBuffer), - } - for i := 0; i < config.buckets; i++ { - c.buckets[i] = &Bucket{ - lookup: make(map[string]*Item), - } - } - go c.worker() - return c + c := &Cache{ + list: list.New(), + Configuration: config, + bucketCount: uint32(config.buckets), + buckets: make([]*Bucket, config.buckets), + deletables: make(chan *Item, config.deleteBuffer), + promotables: make(chan *Item, config.promoteBuffer), + } + for i := 0; i < config.buckets; i++ { + c.buckets[i] = &Bucket{ + lookup: make(map[string]*Item), + } + } + go c.worker() + return c } func (c *Cache) Get(key string) interface{} { - bucket := c.bucket(key) - item := bucket.get(key) - if item == nil { return nil } - if item.expires.Before(time.Now()) { - c.deleteItem(bucket, item) - return nil - } - c.conditionalPromote(item) - return item.value + if item := c.get(key); item != nil { + return item.value + } + return nil +} + +func (c *Cache) TrackingGet(key string) TrackedItem { + item := c.get(key) + if item == nil { + return NilTracked + } + item.track() + return item +} + +func (c *Cache) get(key string) *Item { + bucket := c.bucket(key) + item := bucket.get(key) + if item == nil { + return nil + } + if item.expires.Before(time.Now()) { + c.deleteItem(bucket, item) + return nil + } + c.conditionalPromote(item) + return item } func (c *Cache) Set(key string, value interface{}, duration time.Duration) { - item, new := c.bucket(key).set(key, value, duration) - if new { - c.promote(item) - } else { - c.conditionalPromote(item) - } + item, new := c.bucket(key).set(key, value, duration) + if new { + c.promote(item) + } else { + c.conditionalPromote(item) + } } func (c *Cache) Fetch(key string, duration time.Duration, fetch func() (interface{}, error)) (interface{}, error) { - item := c.Get(key) - if item != nil { return item, nil } - value, err := fetch() - if err == nil { - c.Set(key, value, duration) - } - return value, err + item := c.Get(key) + if item != nil { + return item, nil + } + value, err := fetch() + if err == nil { + c.Set(key, value, duration) + } + return value, err } func (c *Cache) Delete(key string) { - item := c.bucket(key).getAndDelete(key) - if item != nil { - c.deletables <- item - } + item := c.bucket(key).getAndDelete(key) + if item != nil { + c.deletables <- item + } } //this isn't thread safe. It's meant to be called from non-concurrent tests func (c *Cache) Clear() { - for _, bucket := range c.buckets { - bucket.clear() - } - c.list = list.New() + for _, bucket := range c.buckets { + bucket.clear() + } + c.list = list.New() } func (c *Cache) deleteItem(bucket *Bucket, item *Item) { - bucket.delete(item.key) //stop othe GETs from getting it - c.deletables <- item + bucket.delete(item.key) //stop othe GETs from getting it + c.deletables <- item } func (c *Cache) bucket(key string) *Bucket { - h := fnv.New32a() - h.Write([]byte(key)) - index := h.Sum32() % c.bucketCount - return c.buckets[index] + h := fnv.New32a() + h.Write([]byte(key)) + index := h.Sum32() % c.bucketCount + return c.buckets[index] } func (c *Cache) conditionalPromote(item *Item) { - if item.shouldPromote(c.getsPerPromote) == false { return } - c.promote(item) + if item.shouldPromote(c.getsPerPromote) == false { + return + } + c.promote(item) } func (c *Cache) promote(item *Item) { - c.promotables <- item + c.promotables <- item } func (c *Cache) worker() { - ms := new(runtime.MemStats) - for { - select { - case item := <- c.promotables: - wasNew := c.doPromote(item) - if wasNew == false { continue } - runtime.ReadMemStats(ms) - if ms.HeapAlloc > c.size { c.gc() } - case item := <- c.deletables: - c.list.Remove(item.element) - } - } + ms := new(runtime.MemStats) + for { + select { + case item := <-c.promotables: + if wasNew := c.doPromote(item); wasNew == false { + continue + } + runtime.ReadMemStats(ms) + if ms.HeapAlloc > c.size { + c.gc() + } + case item := <-c.deletables: + c.list.Remove(item.element) + } + } } func (c *Cache) doPromote(item *Item) bool { - item.Lock() - defer item.Unlock() - item.promotions = 0 - if item.element != nil { //not a new item - c.list.MoveToFront(item.element) - return false - } - item.element = c.list.PushFront(item) - return true + item.Lock() + defer item.Unlock() + item.promotions = 0 + if item.element != nil { //not a new item + c.list.MoveToFront(item.element) + return false + } + item.element = c.list.PushFront(item) + return true } func (c *Cache) gc() { - for i := 0; i < c.itemsToPrune; i++ { - element := c.list.Back() - if element == nil { return } - item := element.Value.(*Item) - c.bucket(item.key).delete(item.key) - c.list.Remove(element) - } + element := c.list.Back() + for i := 0; i < c.itemsToPrune; i++ { + if element == nil { + return + } + prev := element.Prev() + item := element.Value.(*Item) + if c.tracking == false || atomic.LoadInt32(&item.refCount) == 0 { + c.bucket(item.key).delete(item.key) + c.list.Remove(element) + } + element = prev + } } diff --git a/cache_test.go b/cache_test.go new file mode 100644 index 0000000..f62e16c --- /dev/null +++ b/cache_test.go @@ -0,0 +1,49 @@ +package ccache + +import ( + "github.com/karlseguin/gspec" + "testing" + "strconv" + "time" +) + +func TestGCsTheOldestItems(t *testing.T) { + spec := gspec.New(t) + cache := New(Configure().ItemsToPrune(10)) + for i := 0; i < 500; i++ { + cache.Set(strconv.Itoa(i), i, time.Minute) + } + cache.gc() + spec.Expect(cache.Get("9")).ToBeNil() + spec.Expect(cache.Get("10").(int)).ToEqual(10) +} + +func TestPromotedItemsDontGetPruned(t *testing.T) { + spec := gspec.New(t) + cache := New(Configure().ItemsToPrune(10).GetsPerPromote(1)) + for i := 0; i < 500; i++ { + cache.Set(strconv.Itoa(i), i, time.Minute) + } + cache.Get("9") + time.Sleep(time.Millisecond * 10) + cache.gc() + spec.Expect(cache.Get("9").(int)).ToEqual(9) + spec.Expect(cache.Get("10")).ToBeNil() + spec.Expect(cache.Get("11").(int)).ToEqual(11) +} + +func TestTrackerDoesNotCleanupHeldInstance(t *testing.T) { + spec := gspec.New(t) + cache := New(Configure().ItemsToPrune(10).Track()) + for i := 0; i < 10; i++ { + cache.Set(strconv.Itoa(i), i, time.Minute) + } + item := cache.TrackingGet("0") + time.Sleep(time.Millisecond * 10) + cache.gc() + spec.Expect(cache.Get("0").(int)).ToEqual(0) + spec.Expect(cache.Get("1")).ToBeNil() + item.Release() + cache.gc() + spec.Expect(cache.Get("0")).ToBeNil() +} diff --git a/configuration.go b/configuration.go index d6cef98..f38e855 100644 --- a/configuration.go +++ b/configuration.go @@ -1,51 +1,58 @@ package ccache type Configuration struct { - size uint64 - buckets int - itemsToPrune int - deleteBuffer int - promoteBuffer int - getsPerPromote int32 + size uint64 + buckets int + itemsToPrune int + deleteBuffer int + promoteBuffer int + getsPerPromote int32 + tracking bool } func Configure() *Configuration { - return &Configuration { - buckets: 64, - itemsToPrune: 500, - deleteBuffer: 1024, - getsPerPromote: 10, - promoteBuffer: 1024, - size: 500 * 1024 * 1024, - } + return &Configuration{ + buckets: 64, + itemsToPrune: 500, + deleteBuffer: 1024, + getsPerPromote: 10, + promoteBuffer: 1024, + size: 500 * 1024 * 1024, + tracking: false, + } } func (c *Configuration) Size(bytes uint64) *Configuration { - c.size = bytes - return c + c.size = bytes + return c } func (c *Configuration) Buckets(count int) *Configuration { - c.buckets = count - return c + c.buckets = count + return c } func (c *Configuration) ItemsToPrune(count int) *Configuration { - c.itemsToPrune = count - return c + c.itemsToPrune = count + return c } func (c *Configuration) PromoteBuffer(size int) *Configuration { - c.promoteBuffer = size - return c + c.promoteBuffer = size + return c } func (c *Configuration) DeleteBuffer(size int) *Configuration { - c.deleteBuffer = size - return c + c.deleteBuffer = size + return c } func (c *Configuration) GetsPerPromote(count int) *Configuration { - c.getsPerPromote = int32(count) - return c + c.getsPerPromote = int32(count) + return c +} + +func (c *Configuration) Track() *Configuration { + c.tracking = true + return c } diff --git a/item.go b/item.go index f677ef7..80c1028 100644 --- a/item.go +++ b/item.go @@ -1,30 +1,55 @@ package ccache import ( - "sync" - "time" - "sync/atomic" - "container/list" + "container/list" + "sync" + "sync/atomic" + "time" ) +type TrackedItem interface { + Value() interface{} + Release() +} + +type nilItem struct{} + +func (n *nilItem) Value() interface{} { return nil } +func (n *nilItem) Release() {} + +var NilTracked = new(nilItem) + type Item struct { - key string - sync.RWMutex - promotions int32 - expires time.Time - value interface{} - element *list.Element + key string + sync.RWMutex + promotions int32 + refCount int32 + expires time.Time + value interface{} + element *list.Element } func newItem(key string, value interface{}, expires time.Time) *Item { - return &Item{ - key: key, - value: value, - promotions: -1, - expires: expires, - } + return &Item{ + key: key, + value: value, + promotions: -1, + expires: expires, + } } func (i *Item) shouldPromote(getsPerPromote int32) bool { - return atomic.AddInt32(&i.promotions, 1) == getsPerPromote + return atomic.AddInt32(&i.promotions, 1) == getsPerPromote +} + +func (i *Item) Value() interface{} { + return i.value +} + +func (i *Item) track() { + atomic.AddInt32(&i.refCount, 1) +} + +func (i *Item) Release() { + atomic.AddInt32(&i.refCount, -1) } diff --git a/item_test.go b/item_test.go index eda1f58..a0c6e49 100644 --- a/item_test.go +++ b/item_test.go @@ -1,13 +1,13 @@ package ccache import ( - "testing" - "github.com/viki-org/gspec" + "github.com/karlseguin/gspec" + "testing" ) func TestItemPromotability(t *testing.T) { - spec := gspec.New(t) - item := &Item{promotions: 4} - spec.Expect(item.shouldPromote(5)).ToEqual(true) - spec.Expect(item.shouldPromote(5)).ToEqual(false) + spec := gspec.New(t) + item := &Item{promotions: 4} + spec.Expect(item.shouldPromote(5)).ToEqual(true) + spec.Expect(item.shouldPromote(5)).ToEqual(false) }