Compare commits
6 Commits
setable_bu
...
v3.0.5
Author | SHA1 | Date | |
---|---|---|---|
![]() |
62cd8cc8c3 | ||
![]() |
b26c342793 | ||
![]() |
dd0671989b | ||
![]() |
0f8575167d | ||
![]() |
fd8f81fe86 | ||
![]() |
a25552af28 |
2
Makefile
2
Makefile
@@ -1,6 +1,6 @@
|
||||
.PHONY: t
|
||||
t:
|
||||
go test -race -count=1 .
|
||||
go test -race -count=1 ./...
|
||||
|
||||
.PHONY: f
|
||||
f:
|
||||
|
30
bucket.go
30
bucket.go
@@ -35,6 +35,30 @@ func (b *bucket[T]) get(key string) *Item[T] {
|
||||
return b.lookup[key]
|
||||
}
|
||||
|
||||
func (b *bucket[T]) setnx(key string, value T, duration time.Duration, track bool) *Item[T] {
|
||||
b.RLock()
|
||||
item := b.lookup[key]
|
||||
b.RUnlock()
|
||||
if item != nil {
|
||||
return item
|
||||
}
|
||||
|
||||
expires := time.Now().Add(duration).UnixNano()
|
||||
newItem := newItem(key, value, expires, track)
|
||||
|
||||
b.Lock()
|
||||
defer b.Unlock()
|
||||
|
||||
// check again under write lock
|
||||
item = b.lookup[key]
|
||||
if item != nil {
|
||||
return item
|
||||
}
|
||||
|
||||
b.lookup[key] = newItem
|
||||
return newItem
|
||||
}
|
||||
|
||||
func (b *bucket[T]) set(key string, value T, duration time.Duration, track bool) (*Item[T], *Item[T]) {
|
||||
expires := time.Now().Add(duration).UnixNano()
|
||||
item := newItem(key, value, expires, track)
|
||||
@@ -98,8 +122,10 @@ func (b *bucket[T]) deletePrefix(prefix string, deletables chan *Item[T]) int {
|
||||
}, deletables)
|
||||
}
|
||||
|
||||
// we expect the caller to have acquired a write lock
|
||||
func (b *bucket[T]) clear() {
|
||||
b.Lock()
|
||||
for _, item := range b.lookup {
|
||||
item.promotions = -2
|
||||
}
|
||||
b.lookup = make(map[string]*Item[T])
|
||||
b.Unlock()
|
||||
}
|
||||
|
55
cache.go
55
cache.go
@@ -43,7 +43,6 @@ type Cache[T any] struct {
|
||||
bucketMask uint32
|
||||
deletables chan *Item[T]
|
||||
promotables chan *Item[T]
|
||||
setables chan *Item[T]
|
||||
}
|
||||
|
||||
// Create a new cache with the specified configuration
|
||||
@@ -57,7 +56,6 @@ func New[T any](config *Configuration[T]) *Cache[T] {
|
||||
buckets: make([]*bucket[T], config.buckets),
|
||||
deletables: make(chan *Item[T], config.deleteBuffer),
|
||||
promotables: make(chan *Item[T], config.promoteBuffer),
|
||||
setables: make(chan *Item[T], config.setableBuffer),
|
||||
}
|
||||
for i := 0; i < config.buckets; i++ {
|
||||
c.buckets[i] = &bucket[T]{
|
||||
@@ -148,6 +146,11 @@ func (c *Cache[T]) Set(key string, value T, duration time.Duration) {
|
||||
c.set(key, value, duration, false)
|
||||
}
|
||||
|
||||
// Setnx set the value in the cache for the specified duration if not exists
|
||||
func (c *Cache[T]) Setnx(key string, value T, duration time.Duration) {
|
||||
c.bucket(key).setnx(key, value, duration, false)
|
||||
}
|
||||
|
||||
// Replace the value if it exists, does not set if it doesn't.
|
||||
// Returns true if the item existed an was replaced, false otherwise.
|
||||
// Replace does not reset item's TTL
|
||||
@@ -198,7 +201,7 @@ func (c *Cache[T]) set(key string, value T, duration time.Duration, track bool)
|
||||
if existing != nil {
|
||||
c.deletables <- existing
|
||||
}
|
||||
c.setables <- item
|
||||
c.promotables <- item
|
||||
return item
|
||||
}
|
||||
|
||||
@@ -208,6 +211,24 @@ func (c *Cache[T]) bucket(key string) *bucket[T] {
|
||||
return c.buckets[h.Sum32()&c.bucketMask]
|
||||
}
|
||||
|
||||
func (c *Cache[T]) halted(fn func()) {
|
||||
c.halt()
|
||||
defer c.unhalt()
|
||||
fn()
|
||||
}
|
||||
|
||||
func (c *Cache[T]) halt() {
|
||||
for _, bucket := range c.buckets {
|
||||
bucket.Lock()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cache[T]) unhalt() {
|
||||
for _, bucket := range c.buckets {
|
||||
bucket.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cache[T]) worker() {
|
||||
dropped := 0
|
||||
cc := c.control
|
||||
@@ -222,8 +243,6 @@ func (c *Cache[T]) worker() {
|
||||
select {
|
||||
case item := <-c.promotables:
|
||||
promoteItem(item)
|
||||
case item := <-c.setables:
|
||||
promoteItem(item)
|
||||
case item := <-c.deletables:
|
||||
c.doDelete(item)
|
||||
case control := <-cc:
|
||||
@@ -240,11 +259,22 @@ func (c *Cache[T]) worker() {
|
||||
}
|
||||
msg.done <- struct{}{}
|
||||
case controlClear:
|
||||
for _, bucket := range c.buckets {
|
||||
bucket.clear()
|
||||
}
|
||||
c.size = 0
|
||||
c.list = NewList[*Item[T]]()
|
||||
c.halted(func() {
|
||||
promotables := c.promotables
|
||||
for len(promotables) > 0 {
|
||||
<-promotables
|
||||
}
|
||||
deletables := c.deletables
|
||||
for len(deletables) > 0 {
|
||||
<-deletables
|
||||
}
|
||||
|
||||
for _, bucket := range c.buckets {
|
||||
bucket.clear()
|
||||
}
|
||||
c.size = 0
|
||||
c.list = NewList[*Item[T]]()
|
||||
})
|
||||
msg.done <- struct{}{}
|
||||
case controlGetSize:
|
||||
msg.res <- c.size
|
||||
@@ -252,7 +282,7 @@ func (c *Cache[T]) worker() {
|
||||
dropped += c.gc()
|
||||
msg.done <- struct{}{}
|
||||
case controlSyncUpdates:
|
||||
doAllPendingPromotesAndDeletes(c.promotables, c.setables, promoteItem, c.deletables, c.doDelete)
|
||||
doAllPendingPromotesAndDeletes(c.promotables, promoteItem, c.deletables, c.doDelete)
|
||||
msg.done <- struct{}{}
|
||||
}
|
||||
}
|
||||
@@ -276,7 +306,6 @@ drain:
|
||||
// that were already sent by the same goroutine.
|
||||
func doAllPendingPromotesAndDeletes[T any](
|
||||
promotables <-chan *Item[T],
|
||||
setables <-chan *Item[T],
|
||||
promoteFn func(*Item[T]),
|
||||
deletables <-chan *Item[T],
|
||||
deleteFn func(*Item[T]),
|
||||
@@ -286,8 +315,6 @@ doAllPromotes:
|
||||
select {
|
||||
case item := <-promotables:
|
||||
promoteFn(item)
|
||||
case item := <-setables:
|
||||
promoteFn(item)
|
||||
default:
|
||||
break doAllPromotes
|
||||
}
|
||||
|
@@ -4,6 +4,7 @@ import (
|
||||
"math/rand"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -11,6 +12,27 @@ import (
|
||||
"github.com/karlseguin/ccache/v3/assert"
|
||||
)
|
||||
|
||||
func Test_Setnx(t *testing.T) {
|
||||
cache := New(Configure[string]())
|
||||
defer cache.Stop()
|
||||
assert.Equal(t, cache.ItemCount(), 0)
|
||||
|
||||
cache.Set("spice", "flow", time.Minute)
|
||||
assert.Equal(t, cache.ItemCount(), 1)
|
||||
|
||||
// set if exists
|
||||
cache.Setnx("spice", "worm", time.Minute)
|
||||
assert.Equal(t, cache.ItemCount(), 1)
|
||||
assert.Equal(t, cache.Get("spice").Value(), "flow")
|
||||
|
||||
// set if not exists
|
||||
cache.Delete("spice")
|
||||
cache.Setnx("spice", "worm", time.Minute)
|
||||
assert.Equal(t, cache.Get("spice").Value(), "worm")
|
||||
|
||||
assert.Equal(t, cache.ItemCount(), 1)
|
||||
}
|
||||
|
||||
func Test_CacheDeletesAValue(t *testing.T) {
|
||||
cache := New(Configure[string]())
|
||||
defer cache.Stop()
|
||||
@@ -361,18 +383,38 @@ func Test_ConcurrentStop(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func Test_UnbufferedSetable_Enforces_MaxSize(t *testing.T) {
|
||||
cache := New(Configure[string]().MaxSize(3).SetableBuffer(0).ItemsToPrune(1))
|
||||
cache.Set("a", "1", time.Minute)
|
||||
cache.Set("b", "2", time.Minute)
|
||||
cache.Set("c", "3", time.Minute)
|
||||
cache.Set("d", "4", time.Minute)
|
||||
cache.Set("e", "5", time.Minute)
|
||||
assert.Nil(t, cache.Get("a"))
|
||||
// "b" could or could not be purged
|
||||
assert.Equal(t, cache.Get("c").Value(), "3")
|
||||
assert.Equal(t, cache.Get("d").Value(), "4")
|
||||
assert.Equal(t, cache.Get("e").Value(), "5")
|
||||
func Test_ConcurrentClearAndSet(t *testing.T) {
|
||||
for i := 0; i < 100; i++ {
|
||||
var stop atomic.Bool
|
||||
var wg sync.WaitGroup
|
||||
|
||||
cache := New(Configure[string]())
|
||||
r := func() {
|
||||
for !stop.Load() {
|
||||
cache.Set("a", "a", time.Minute)
|
||||
}
|
||||
wg.Done()
|
||||
}
|
||||
go r()
|
||||
wg.Add(1)
|
||||
cache.Clear()
|
||||
stop.Store(true)
|
||||
wg.Wait()
|
||||
time.Sleep(time.Millisecond)
|
||||
cache.SyncUpdates()
|
||||
|
||||
known := make(map[string]struct{})
|
||||
for node := cache.list.Head; node != nil; node = node.Next {
|
||||
known[node.Value.key] = struct{}{}
|
||||
}
|
||||
|
||||
for _, bucket := range cache.buckets {
|
||||
for key := range bucket.lookup {
|
||||
_, exists := known[key]
|
||||
assert.True(t, exists)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type SizedItem struct {
|
||||
|
@@ -6,7 +6,6 @@ type Configuration[T any] struct {
|
||||
itemsToPrune int
|
||||
deleteBuffer int
|
||||
promoteBuffer int
|
||||
setableBuffer int
|
||||
getsPerPromote int32
|
||||
tracking bool
|
||||
onDelete func(item *Item[T])
|
||||
@@ -22,7 +21,6 @@ func Configure[T any]() *Configuration[T] {
|
||||
deleteBuffer: 1024,
|
||||
getsPerPromote: 3,
|
||||
promoteBuffer: 1024,
|
||||
setableBuffer: 256,
|
||||
maxSize: 5000,
|
||||
tracking: false,
|
||||
}
|
||||
@@ -61,14 +59,6 @@ func (c *Configuration[T]) PromoteBuffer(size uint32) *Configuration[T] {
|
||||
return c
|
||||
}
|
||||
|
||||
// The size of the queue for items which are set. If the queue fills, sets block.
|
||||
// Setting this to 0 will ensure that the queue never grows being MaxSize+1
|
||||
// [256]
|
||||
func (c *Configuration[T]) SetableBuffer(size uint32) *Configuration[T] {
|
||||
c.setableBuffer = int(size)
|
||||
return c
|
||||
}
|
||||
|
||||
// The size of the queue for items which should be deleted. If the queue fills
|
||||
// up, calls to Delete() will block
|
||||
func (c *Configuration[T]) DeleteBuffer(size uint32) *Configuration[T] {
|
||||
|
4
item.go
4
item.go
@@ -55,6 +55,10 @@ func (i *Item[T]) shouldPromote(getsPerPromote int32) bool {
|
||||
return i.promotions == getsPerPromote
|
||||
}
|
||||
|
||||
func (i *Item[T]) Key() string {
|
||||
return i.key
|
||||
}
|
||||
|
||||
func (i *Item[T]) Value() T {
|
||||
return i.value
|
||||
}
|
||||
|
@@ -8,6 +8,11 @@ import (
|
||||
"github.com/karlseguin/ccache/v3/assert"
|
||||
)
|
||||
|
||||
func Test_Item_Key(t *testing.T) {
|
||||
item := &Item[int]{key: "foo"}
|
||||
assert.Equal(t, item.Key(), "foo")
|
||||
}
|
||||
|
||||
func Test_Item_Promotability(t *testing.T) {
|
||||
item := &Item[int]{promotions: 4}
|
||||
assert.Equal(t, item.shouldPromote(5), true)
|
||||
|
@@ -111,9 +111,8 @@ func (b *layeredBucket[T]) forEachFunc(primary string, matches func(key string,
|
||||
}
|
||||
}
|
||||
|
||||
// we expect the caller to have acquired a write lock
|
||||
func (b *layeredBucket[T]) clear() {
|
||||
b.Lock()
|
||||
defer b.Unlock()
|
||||
for _, bucket := range b.buckets {
|
||||
bucket.clear()
|
||||
}
|
||||
|
@@ -16,7 +16,6 @@ type LayeredCache[T any] struct {
|
||||
size int64
|
||||
deletables chan *Item[T]
|
||||
promotables chan *Item[T]
|
||||
setables chan *Item[T]
|
||||
}
|
||||
|
||||
// Create a new layered cache with the specified configuration.
|
||||
@@ -41,7 +40,6 @@ func Layered[T any](config *Configuration[T]) *LayeredCache[T] {
|
||||
buckets: make([]*layeredBucket[T], config.buckets),
|
||||
deletables: make(chan *Item[T], config.deleteBuffer),
|
||||
promotables: make(chan *Item[T], config.promoteBuffer),
|
||||
setables: make(chan *Item[T], config.setableBuffer),
|
||||
}
|
||||
for i := 0; i < int(config.buckets); i++ {
|
||||
c.buckets[i] = &layeredBucket[T]{
|
||||
@@ -188,7 +186,7 @@ func (c *LayeredCache[T]) set(primary, secondary string, value T, duration time.
|
||||
if existing != nil {
|
||||
c.deletables <- existing
|
||||
}
|
||||
c.setables <- item
|
||||
c.promote(item)
|
||||
return item
|
||||
}
|
||||
|
||||
@@ -198,6 +196,28 @@ func (c *LayeredCache[T]) bucket(key string) *layeredBucket[T] {
|
||||
return c.buckets[h.Sum32()&c.bucketMask]
|
||||
}
|
||||
|
||||
func (c *LayeredCache[T]) halted(fn func()) {
|
||||
c.halt()
|
||||
defer c.unhalt()
|
||||
fn()
|
||||
}
|
||||
|
||||
func (c *LayeredCache[T]) halt() {
|
||||
for _, bucket := range c.buckets {
|
||||
bucket.Lock()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *LayeredCache[T]) unhalt() {
|
||||
for _, bucket := range c.buckets {
|
||||
bucket.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *LayeredCache[T]) promote(item *Item[T]) {
|
||||
c.promotables <- item
|
||||
}
|
||||
|
||||
func (c *LayeredCache[T]) worker() {
|
||||
dropped := 0
|
||||
cc := c.control
|
||||
@@ -212,8 +232,6 @@ func (c *LayeredCache[T]) worker() {
|
||||
select {
|
||||
case item := <-c.promotables:
|
||||
promoteItem(item)
|
||||
case item := <-c.setables:
|
||||
promoteItem(item)
|
||||
case item := <-c.deletables:
|
||||
c.doDelete(item)
|
||||
case control := <-cc:
|
||||
@@ -230,11 +248,22 @@ func (c *LayeredCache[T]) worker() {
|
||||
}
|
||||
msg.done <- struct{}{}
|
||||
case controlClear:
|
||||
for _, bucket := range c.buckets {
|
||||
bucket.clear()
|
||||
promotables := c.promotables
|
||||
for len(promotables) > 0 {
|
||||
<-promotables
|
||||
}
|
||||
c.size = 0
|
||||
c.list = NewList[*Item[T]]()
|
||||
deletables := c.deletables
|
||||
for len(deletables) > 0 {
|
||||
<-deletables
|
||||
}
|
||||
|
||||
c.halted(func() {
|
||||
for _, bucket := range c.buckets {
|
||||
bucket.clear()
|
||||
}
|
||||
c.size = 0
|
||||
c.list = NewList[*Item[T]]()
|
||||
})
|
||||
msg.done <- struct{}{}
|
||||
case controlGetSize:
|
||||
msg.res <- c.size
|
||||
@@ -242,7 +271,7 @@ func (c *LayeredCache[T]) worker() {
|
||||
dropped += c.gc()
|
||||
msg.done <- struct{}{}
|
||||
case controlSyncUpdates:
|
||||
doAllPendingPromotesAndDeletes(c.promotables, c.promotables, promoteItem, c.deletables, c.doDelete)
|
||||
doAllPendingPromotesAndDeletes(c.promotables, promoteItem, c.deletables, c.doDelete)
|
||||
msg.done <- struct{}{}
|
||||
}
|
||||
}
|
||||
|
15
readme.md
15
readme.md
@@ -37,9 +37,9 @@ var cache = ccache.New(ccache.Configure[int]().MaxSize(1000).ItemsToPrune(100))
|
||||
|
||||
The most likely configuration options to tweak are:
|
||||
|
||||
* `MaxSize(int)` - the maximum size to store in the cache (default: 5000)
|
||||
* `MaxSize(int)` - the maximum number size to store in the cache (default: 5000)
|
||||
* `GetsPerPromote(int)` - the number of times an item is fetched before we promote it. For large caches with long TTLs, it normally isn't necessary to promote an item after every fetch (default: 3)
|
||||
* `ItemsToPrune(int)` - the number of items to prune when we hit `MaxSize`. Freeing up more than 1 slot at a time improves performance (default: 500)
|
||||
* `ItemsToPrune(int)` - the number of items to prune when we hit `MaxSize`. Freeing up more than 1 slot at a time improved performance (default: 500)
|
||||
|
||||
Configurations that change the internals of the cache, which aren't as likely to need tweaking:
|
||||
|
||||
@@ -47,9 +47,6 @@ Configurations that change the internals of the cache, which aren't as likely to
|
||||
* `PromoteBuffer(int)` - the size of the buffer to use to queue promotions (default: 1024)
|
||||
* `DeleteBuffer(int)` the size of the buffer to use to queue deletions (default: 1024)
|
||||
|
||||
## MaxSize Soft vs Hard Limit
|
||||
By default, MaxSize is a soft limit. This is a result of the fact that cache pruning happens asychronously in a separate "worker" goroutine. The hard limit is MaxSize + SetableBufferSize + 1. The SettableBufferSize can be configured via the `SetableBuffer(INT)` configuration function. It defaults to 256.
|
||||
|
||||
## Usage
|
||||
|
||||
Once the cache is setup, you can `Get`, `Set` and `Delete` items from it. A `Get` returns an `*Item`:
|
||||
@@ -125,6 +122,14 @@ cache.Replace("user:4", user)
|
||||
|
||||
`Replace` returns true if the item existed (and thus was replaced). In the case where the key was not in the cache, the value *is not* inserted and false is returned.
|
||||
|
||||
### Setnx
|
||||
|
||||
Set the value if not exists. setnx will first check whether kv exists. If it does not exist, set kv in cache. this operation is atomic.
|
||||
|
||||
```go
|
||||
cache.Set("user:4", user, time.Minute * 10)
|
||||
```
|
||||
|
||||
### GetDropped
|
||||
You can get the number of keys evicted due to memory pressure by calling `GetDropped`:
|
||||
|
||||
|
@@ -20,7 +20,7 @@ func (s *SecondaryCache[T]) Set(secondary string, value T, duration time.Duratio
|
||||
if existing != nil {
|
||||
s.pCache.deletables <- existing
|
||||
}
|
||||
s.pCache.promotables <- item
|
||||
s.pCache.promote(item)
|
||||
return item
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user