13 Commits

Author SHA1 Message Date
Karl Seguin
40275a30c8 Ability to dynamically SetMaxSize
To support this, rather than adding another field/channel like
`getDroppedReq`, I added a `control` channel that can be used for these
miscellaneous interactions with the worker. The control can also be
used to take over for the `donec` channel
2020-06-26 20:22:30 +08:00
Karl Seguin
d9aec58960 add GetDropped documentation 2020-02-16 11:54:07 +08:00
Karl Seguin
1a257a89d6 add GetDropped function 2020-02-05 22:05:05 +08:00
Karl Seguin
78289f8f0b Merge pull request #38 from karlseguin/DeletePrefix
Delete prefix
2020-01-23 13:03:42 +08:00
Karl Seguin
79f9dcde21 fewer defers, document DeletePrefix 2020-01-23 12:55:55 +08:00
Karl Seguin
04261a5282 Merge branch 'master' into DeletePrefix 2020-01-23 12:50:53 +08:00
Karl Seguin
569ae60338 Merge pull request #37 from aporeto-inc/pin-expect
fixed/module: expect needs to be on master
2020-01-23 11:44:58 +08:00
Antoine Mercadal
048ac0669f fixed/module: expect needs to be on master 2020-01-22 19:35:19 -08:00
Karl Seguin
2ff4136636 Merge pull request #36 from aporeto-inc/go-modules
migrate to go modules
2020-01-23 11:19:07 +08:00
Antoine Mercadal
f79de0e254 migrate to go modules 2020-01-22 19:16:52 -08:00
Karl Seguin
356e164dd5 preliminary work on DeletePrefix 2020-01-23 10:27:12 +08:00
Karl Seguin
2ff889bcae document Clear 2020-01-23 10:04:47 +08:00
Karl Seguin
46ec5d2257 explicitly state the thread-safety nature of the library 2020-01-23 09:38:19 +08:00
10 changed files with 258 additions and 56 deletions

29
Gopkg.lock generated
View File

@@ -1,29 +0,0 @@
# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'.
[[projects]]
digest = "1:65c6a2822d8653e4f780c259a86d1b444c0b1ce7601b500deb985387bfe6bdec"
name = "github.com/karlseguin/expect"
packages = [
".",
"build",
"mock",
]
pruneopts = "UT"
revision = "4fcda73748276dc72bcc09729bdb56242093c12c"
version = "v1.0.1"
[[projects]]
branch = "master"
digest = "1:d594bb9f2a18ba4da7ab1368f4debf59f6b77cc7046705553f966837c12059f1"
name = "github.com/wsxiaoys/terminal"
packages = ["color"]
pruneopts = "UT"
revision = "0940f3fc43a0ed42d04916b1c04578462c650b09"
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
input-imports = ["github.com/karlseguin/expect"]
solver-name = "gps-cdcl"
solver-version = 1

View File

@@ -1,8 +0,0 @@
[[constraint]]
name = "github.com/karlseguin/expect"
version = "1.0.1"
[prune]
go-tests = true
unused-packages = true

View File

@@ -1,6 +1,7 @@
package ccache
import (
"strings"
"sync"
"time"
)
@@ -26,22 +27,61 @@ func (b *bucket) set(key string, value interface{}, duration time.Duration) (*It
expires := time.Now().Add(duration).UnixNano()
item := newItem(key, value, expires)
b.Lock()
defer b.Unlock()
existing := b.lookup[key]
b.lookup[key] = item
b.Unlock()
return item, existing
}
func (b *bucket) delete(key string) *Item {
b.Lock()
defer b.Unlock()
item := b.lookup[key]
delete(b.lookup, key)
b.Unlock()
return item
}
// This is an expensive operation, so we do what we can to optimize it and limit
// the impact it has on concurrent operations. Specifically, we:
// 1 - Do an initial iteration to collect matches. This allows us to do the
// "expensive" prefix check (on all values) using only a read-lock
// 2 - Do a second iteration, under write lock, for the matched results to do
// the actual deletion
// Also, this is the only place where the Bucket is aware of cache detail: the
// deletables channel. Passing it here lets us avoid iterating over matched items
// again in the cache. Further, we pass item to deletables BEFORE actually removing
// the item from the map. I'm pretty sure this is 100% fine, but it is unique.
// (We do this so that the write to the channel is under the read lock and not the
// write lock)
func (b *bucket) deletePrefix(prefix string, deletables chan *Item) int {
lookup := b.lookup
items := make([]*Item, 0, len(lookup)/10)
b.RLock()
for key, item := range lookup {
if strings.HasPrefix(key, prefix) {
deletables <- item
items = append(items, item)
}
}
b.RUnlock()
if len(items) == 0 {
// avoid the write lock if we can
return 0
}
b.Lock()
for _, item := range items {
delete(lookup, item.key)
}
b.Unlock()
return len(items)
}
func (b *bucket) clear() {
b.Lock()
defer b.Unlock()
b.lookup = make(map[string]*Item)
b.Unlock()
}

View File

@@ -8,6 +8,15 @@ import (
"time"
)
// The cache has a generic 'control' channel that is used to send
// messages to the worker. These are the messages that can be sent to it
type getDropped struct {
res chan int
}
type setMaxSize struct {
size int64
}
type Cache struct {
*Configuration
list *list.List
@@ -16,7 +25,7 @@ type Cache struct {
bucketMask uint32
deletables chan *Item
promotables chan *Item
donec chan struct{}
control chan interface{}
}
// Create a new cache with the specified configuration
@@ -27,6 +36,7 @@ func New(config *Configuration) *Cache {
Configuration: config,
bucketMask: uint32(config.buckets) - 1,
buckets: make([]*bucket, config.buckets),
control: make(chan interface{}),
}
for i := 0; i < int(config.buckets); i++ {
c.buckets[i] = &bucket{
@@ -45,6 +55,14 @@ func (c *Cache) ItemCount() int {
return count
}
func (c *Cache) DeletePrefix(prefix string) int {
count := 0
for _, b := range c.buckets {
count += b.deletePrefix(prefix, c.deletables)
}
return count
}
// Get an item from the cache. Returns nil if the item wasn't found.
// This can return an expired item. Use item.Expired() to see if the item
// is expired and item.TTL() to see how long until the item expires (which
@@ -126,13 +144,27 @@ func (c *Cache) Clear() {
// is called are likely to panic
func (c *Cache) Stop() {
close(c.promotables)
<-c.donec
<-c.control
}
// Gets the number of items removed from the cache due to memory pressure since
// the last time GetDropped was called
func (c *Cache) GetDropped() int {
res := make(chan int)
c.control <- getDropped{res: res}
return <-res
}
// Sets a new max size. That can result in a GC being run if the new maxium size
// is smaller than the cached size
func (c *Cache) SetMaxSize(size int64) {
c.control <- setMaxSize{size}
}
func (c *Cache) restart() {
c.deletables = make(chan *Item, c.deleteBuffer)
c.promotables = make(chan *Item, c.promoteBuffer)
c.donec = make(chan struct{})
c.control = make(chan interface{})
go c.worker()
}
@@ -161,8 +193,8 @@ func (c *Cache) promote(item *Item) {
}
func (c *Cache) worker() {
defer close(c.donec)
defer close(c.control)
dropped := 0
for {
select {
case item, ok := <-c.promotables:
@@ -170,10 +202,21 @@ func (c *Cache) worker() {
goto drain
}
if c.doPromote(item) && c.size > c.maxSize {
c.gc()
dropped += c.gc()
}
case item := <-c.deletables:
c.doDelete(item)
case control := <-c.control:
switch msg := control.(type) {
case getDropped:
msg.res <- dropped
dropped = 0
case setMaxSize:
c.maxSize = msg.size
if c.size > c.maxSize {
dropped += c.gc()
}
}
}
}
@@ -219,11 +262,12 @@ func (c *Cache) doPromote(item *Item) bool {
return true
}
func (c *Cache) gc() {
func (c *Cache) gc() int {
dropped := 0
element := c.list.Back()
for i := 0; i < c.itemsToPrune; i++ {
if element == nil {
return
return dropped
}
prev := element.Prev()
item := element.Value.(*Item)
@@ -234,8 +278,10 @@ func (c *Cache) gc() {
if c.onDelete != nil {
c.onDelete(item)
}
dropped += 1
item.promotions = -2
}
element = prev
}
return dropped
}

View File

@@ -28,6 +28,29 @@ func (_ CacheTests) DeletesAValue() {
Expect(cache.ItemCount()).To.Equal(1)
}
func (_ CacheTests) DeletesAPrefix() {
cache := New(Configure())
Expect(cache.ItemCount()).To.Equal(0)
cache.Set("aaa", "1", time.Minute)
cache.Set("aab", "2", time.Minute)
cache.Set("aac", "3", time.Minute)
cache.Set("ac", "4", time.Minute)
cache.Set("z5", "7", time.Minute)
Expect(cache.ItemCount()).To.Equal(5)
Expect(cache.DeletePrefix("9a")).To.Equal(0)
Expect(cache.ItemCount()).To.Equal(5)
Expect(cache.DeletePrefix("aa")).To.Equal(3)
Expect(cache.Get("aaa")).To.Equal(nil)
Expect(cache.Get("aab")).To.Equal(nil)
Expect(cache.Get("aac")).To.Equal(nil)
Expect(cache.Get("ac").Value()).To.Equal("4")
Expect(cache.Get("z5").Value()).To.Equal("7")
Expect(cache.ItemCount()).To.Equal(2)
}
func (_ CacheTests) OnDeleteCallbackCalled() {
onDeleteFnCalled := false
onDeleteFn := func(item *Item) {
@@ -133,6 +156,8 @@ func (_ CacheTests) RemovesOldestItemWhenFullBySizer() {
Expect(cache.Get("2")).To.Equal(nil)
Expect(cache.Get("3")).To.Equal(nil)
Expect(cache.Get("4").Value().(*SizedItem).id).To.Equal(4)
Expect(cache.GetDropped()).To.Equal(4)
Expect(cache.GetDropped()).To.Equal(0)
}
func (_ CacheTests) SetUpdatesSizeOnDelta() {
@@ -183,6 +208,38 @@ func (_ CacheTests) ReplaceChangesSize() {
checkSize(cache, 5)
}
func (_ CacheTests) ResizeOnTheFly() {
cache := New(Configure().MaxSize(9).ItemsToPrune(1))
for i := 0; i < 5; i++ {
cache.Set(strconv.Itoa(i), i, time.Minute)
}
cache.SetMaxSize(3)
time.Sleep(time.Millisecond * 10)
Expect(cache.GetDropped()).To.Equal(2)
Expect(cache.Get("0")).To.Equal(nil)
Expect(cache.Get("1")).To.Equal(nil)
Expect(cache.Get("2").Value()).To.Equal(2)
Expect(cache.Get("3").Value()).To.Equal(3)
Expect(cache.Get("4").Value()).To.Equal(4)
cache.Set("5", 5, time.Minute)
time.Sleep(time.Millisecond * 5)
Expect(cache.GetDropped()).To.Equal(1)
Expect(cache.Get("2")).To.Equal(nil)
Expect(cache.Get("3").Value()).To.Equal(3)
Expect(cache.Get("4").Value()).To.Equal(4)
Expect(cache.Get("5").Value()).To.Equal(5)
cache.SetMaxSize(10)
cache.Set("6", 6, time.Minute)
time.Sleep(time.Millisecond * 10)
Expect(cache.GetDropped()).To.Equal(0)
Expect(cache.Get("3").Value()).To.Equal(3)
Expect(cache.Get("4").Value()).To.Equal(4)
Expect(cache.Get("5").Value()).To.Equal(5)
Expect(cache.Get("6").Value()).To.Equal(6)
}
type SizedItem struct {
id int
s int64

8
go.mod Normal file
View File

@@ -0,0 +1,8 @@
module github.com/karlseguin/ccache
go 1.13
require (
github.com/karlseguin/expect v1.0.2-0.20190806010014-778a5f0c6003
github.com/wsxiaoys/terminal v0.0.0-20160513160801-0940f3fc43a0
)

6
go.sum Normal file
View File

@@ -0,0 +1,6 @@
github.com/karlseguin/expect v1.0.1 h1:z4wy4npwwHSWKjGWH85WNJO42VQhovxTCZDSzhjo8hY=
github.com/karlseguin/expect v1.0.1/go.mod h1:zNBxMY8P21owkeogJELCLeHIt+voOSduHYTFUbwRAV8=
github.com/karlseguin/expect v1.0.2-0.20190806010014-778a5f0c6003 h1:vJ0Snvo+SLMY72r5J4sEfkuE7AFbixEP2qRbEcum/wA=
github.com/karlseguin/expect v1.0.2-0.20190806010014-778a5f0c6003/go.mod h1:zNBxMY8P21owkeogJELCLeHIt+voOSduHYTFUbwRAV8=
github.com/wsxiaoys/terminal v0.0.0-20160513160801-0940f3fc43a0 h1:3UeQBvD0TFrlVjOeLOBz+CPAI8dnbqNSVwUwRrkp7vQ=
github.com/wsxiaoys/terminal v0.0.0-20160513160801-0940f3fc43a0/go.mod h1:IXCdmsXIht47RaVFLEdVnh1t+pgYtTAhQGj73kz+2DM=

View File

@@ -16,7 +16,7 @@ type LayeredCache struct {
size int64
deletables chan *Item
promotables chan *Item
donec chan struct{}
control chan interface{}
}
// Create a new layered cache with the specified configuration.
@@ -39,6 +39,7 @@ func Layered(config *Configuration) *LayeredCache {
bucketMask: uint32(config.buckets) - 1,
buckets: make([]*layeredBucket, config.buckets),
deletables: make(chan *Item, config.deleteBuffer),
control: make(chan interface{}),
}
for i := 0; i < int(config.buckets); i++ {
c.buckets[i] = &layeredBucket{
@@ -159,12 +160,26 @@ func (c *LayeredCache) Clear() {
func (c *LayeredCache) Stop() {
close(c.promotables)
<-c.donec
<-c.control
}
// Gets the number of items removed from the cache due to memory pressure since
// the last time GetDropped was called
func (c *LayeredCache) GetDropped() int {
res := make(chan int)
c.control <- getDropped{res: res}
return <-res
}
// Sets a new max size. That can result in a GC being run if the new maxium size
// is smaller than the cached size
func (c *LayeredCache) SetMaxSize(size int64) {
c.control <- setMaxSize{size}
}
func (c *LayeredCache) restart() {
c.promotables = make(chan *Item, c.promoteBuffer)
c.donec = make(chan struct{})
c.control = make(chan interface{})
go c.worker()
}
@@ -188,7 +203,8 @@ func (c *LayeredCache) promote(item *Item) {
}
func (c *LayeredCache) worker() {
defer close(c.donec)
defer close(c.control)
dropped := 0
for {
select {
case item, ok := <-c.promotables:
@@ -196,7 +212,7 @@ func (c *LayeredCache) worker() {
return
}
if c.doPromote(item) && c.size > c.maxSize {
c.gc()
dropped += c.gc()
}
case item := <-c.deletables:
if item.element == nil {
@@ -208,6 +224,17 @@ func (c *LayeredCache) worker() {
}
c.list.Remove(item.element)
}
case control := <-c.control:
switch msg := control.(type) {
case getDropped:
msg.res <- dropped
dropped = 0
case setMaxSize:
c.maxSize = msg.size
if c.size > c.maxSize {
dropped += c.gc()
}
}
}
}
}
@@ -229,11 +256,12 @@ func (c *LayeredCache) doPromote(item *Item) bool {
return true
}
func (c *LayeredCache) gc() {
func (c *LayeredCache) gc() int {
element := c.list.Back()
dropped := 0
for i := 0; i < c.itemsToPrune; i++ {
if element == nil {
return
return dropped
}
prev := element.Prev()
item := element.Value.(*Item)
@@ -242,7 +270,9 @@ func (c *LayeredCache) gc() {
c.size -= item.size
c.list.Remove(element)
item.promotions = -2
dropped += 1
}
element = prev
}
return dropped
}

View File

@@ -170,6 +170,40 @@ func (_ LayeredCacheTests) RemovesOldestItemWhenFull() {
Expect(cache.Get("2", "a")).To.Equal(nil)
Expect(cache.Get("3", "a").Value()).To.Equal(3)
Expect(cache.Get("xx", "b").Value()).To.Equal(9001)
Expect(cache.GetDropped()).To.Equal(4)
Expect(cache.GetDropped()).To.Equal(0)
}
func (_ LayeredCacheTests) ResizeOnTheFly() {
cache := Layered(Configure().MaxSize(9).ItemsToPrune(1))
for i := 0; i < 5; i++ {
cache.Set(strconv.Itoa(i), "a", i, time.Minute)
}
cache.SetMaxSize(3)
time.Sleep(time.Millisecond * 10)
Expect(cache.GetDropped()).To.Equal(2)
Expect(cache.Get("0", "a")).To.Equal(nil)
Expect(cache.Get("1", "a")).To.Equal(nil)
Expect(cache.Get("2", "a").Value()).To.Equal(2)
Expect(cache.Get("3", "a").Value()).To.Equal(3)
Expect(cache.Get("4", "a").Value()).To.Equal(4)
cache.Set("5", "a", 5, time.Minute)
time.Sleep(time.Millisecond * 5)
Expect(cache.GetDropped()).To.Equal(1)
Expect(cache.Get("2", "a")).To.Equal(nil)
Expect(cache.Get("3", "a").Value()).To.Equal(3)
Expect(cache.Get("4", "a").Value()).To.Equal(4)
Expect(cache.Get("5", "a").Value()).To.Equal(5)
cache.SetMaxSize(10)
cache.Set("6", "a", 6, time.Minute)
time.Sleep(time.Millisecond * 10)
Expect(cache.GetDropped()).To.Equal(0)
Expect(cache.Get("3", "a").Value()).To.Equal(3)
Expect(cache.Get("4", "a").Value()).To.Equal(4)
Expect(cache.Get("5", "a").Value()).To.Equal(5)
Expect(cache.Get("6", "a").Value()).To.Equal(6)
}
func newLayered() *LayeredCache {

View File

@@ -7,11 +7,15 @@ Lock contention on the list is reduced by:
* Using a buffered channel to queue promotions for a single worker
* Garbage collecting within the same thread as the worker
Unless otherwise stated, all methods are thread-safe.
## Setup
First, download the project:
```go
go get github.com/karlseguin/ccache
```
## Configuration
Next, import and create a `Cache` instance:
@@ -89,6 +93,12 @@ item, err := cache.Fetch("user:4", time.Minute * 10, func() (interface{}, error)
cache.Delete("user:4")
```
### DeletePrefix
`DeletePrefix` deletes all keys matching the provided prefix. Returns the number of keys removed.
### Clear
`Clear` clears the cache. This method is **not** thread safe. It is meant to be used from tests.
### Extend
The life of an item can be changed via the `Extend` method. This will change the expiry of the item by the specified duration relative to the current time.
@@ -101,6 +111,14 @@ cache.Replace("user:4", user)
`Replace` returns true if the item existed (and thus was replaced). In the case where the key was not in the cache, the value *is not* inserted and false is returned.
### GetDropped
You can get the number of keys evicted due to memory pressure by calling `GetDropped`:
```go
dropped := cache.GetDropped()
```
The counter is reset on every call. If the cache's gc is running, `GetDropped` waits for it to finish; it's meant ot be called asynchronously for statistics /monitoring purposes.
### Stop
The cache's background worker can be stopped by calling `Stop`. Once `Stop` is called
the cache should not be used (calls are likely to panic). Stop must be called in order to allow the garbage collector to reap the cache.
@@ -126,7 +144,7 @@ In practice, `Release` wouldn't be called until later, at some other place in yo
There's a couple reason to use the tracking mode if other parts of your code also hold references to objects. First, if you're already going to hold a reference to these objects, there's really no reason not to have them in the cache - the memory is used up anyways.
More important, it helps ensure that you're code returns consistent data. With tracking, "user:4" might be purged, and a subsequent `Fetch` would reload the data. This can result in different versions of "user:4" being returned by different parts of your system.
More important, it helps ensure that your code returns consistent data. With tracking, "user:4" might be purged, and a subsequent `Fetch` would reload the data. This can result in different versions of "user:4" being returned by different parts of your system.
## LayeredCache