6 Commits

Author SHA1 Message Date
Karl Seguin
62cd8cc8c3 Merge pull request #85 from rfyiamcool/feat/add_setnx
feat: add setnx (if not exists, set kv)
2023-10-22 20:19:26 +08:00
rfyiamcool
b26c342793 feat: add setnx (if not exists, set kv)
Signed-off-by: rfyiamcool <rfyiamcool@163.com>
2023-10-22 19:23:23 +08:00
rfyiamcool
dd0671989b feat: add setnx (if not exists, set kv)
Signed-off-by: rfyiamcool <rfyiamcool@163.com>
2023-10-20 10:36:04 +08:00
Karl Seguin
0f8575167d Merge pull request #84 from idsulik/added-key-method-to-item
Added Key() method to Item
2023-10-20 06:51:10 +08:00
Suleiman Dibirov
fd8f81fe86 Added Key() method to Item 2023-10-19 12:16:13 +03:00
Karl Seguin
a25552af28 Attempt to make Clear concurrency-safe
This is an attempt at fixing #81 without imposing a performance hit on the
cache's "normal" (get/set/fetch) activity. Calling "Clear" is now considerably
more expensive.
2023-04-14 15:27:39 +08:00
11 changed files with 184 additions and 57 deletions

View File

@@ -1,6 +1,6 @@
.PHONY: t
t:
go test -race -count=1 .
go test -race -count=1 ./...
.PHONY: f
f:

View File

@@ -35,6 +35,30 @@ func (b *bucket[T]) get(key string) *Item[T] {
return b.lookup[key]
}
func (b *bucket[T]) setnx(key string, value T, duration time.Duration, track bool) *Item[T] {
b.RLock()
item := b.lookup[key]
b.RUnlock()
if item != nil {
return item
}
expires := time.Now().Add(duration).UnixNano()
newItem := newItem(key, value, expires, track)
b.Lock()
defer b.Unlock()
// check again under write lock
item = b.lookup[key]
if item != nil {
return item
}
b.lookup[key] = newItem
return newItem
}
func (b *bucket[T]) set(key string, value T, duration time.Duration, track bool) (*Item[T], *Item[T]) {
expires := time.Now().Add(duration).UnixNano()
item := newItem(key, value, expires, track)
@@ -98,8 +122,10 @@ func (b *bucket[T]) deletePrefix(prefix string, deletables chan *Item[T]) int {
}, deletables)
}
// we expect the caller to have acquired a write lock
func (b *bucket[T]) clear() {
b.Lock()
for _, item := range b.lookup {
item.promotions = -2
}
b.lookup = make(map[string]*Item[T])
b.Unlock()
}

View File

@@ -43,7 +43,6 @@ type Cache[T any] struct {
bucketMask uint32
deletables chan *Item[T]
promotables chan *Item[T]
setables chan *Item[T]
}
// Create a new cache with the specified configuration
@@ -57,7 +56,6 @@ func New[T any](config *Configuration[T]) *Cache[T] {
buckets: make([]*bucket[T], config.buckets),
deletables: make(chan *Item[T], config.deleteBuffer),
promotables: make(chan *Item[T], config.promoteBuffer),
setables: make(chan *Item[T], config.setableBuffer),
}
for i := 0; i < config.buckets; i++ {
c.buckets[i] = &bucket[T]{
@@ -148,6 +146,11 @@ func (c *Cache[T]) Set(key string, value T, duration time.Duration) {
c.set(key, value, duration, false)
}
// Setnx set the value in the cache for the specified duration if not exists
func (c *Cache[T]) Setnx(key string, value T, duration time.Duration) {
c.bucket(key).setnx(key, value, duration, false)
}
// Replace the value if it exists, does not set if it doesn't.
// Returns true if the item existed an was replaced, false otherwise.
// Replace does not reset item's TTL
@@ -198,7 +201,7 @@ func (c *Cache[T]) set(key string, value T, duration time.Duration, track bool)
if existing != nil {
c.deletables <- existing
}
c.setables <- item
c.promotables <- item
return item
}
@@ -208,6 +211,24 @@ func (c *Cache[T]) bucket(key string) *bucket[T] {
return c.buckets[h.Sum32()&c.bucketMask]
}
func (c *Cache[T]) halted(fn func()) {
c.halt()
defer c.unhalt()
fn()
}
func (c *Cache[T]) halt() {
for _, bucket := range c.buckets {
bucket.Lock()
}
}
func (c *Cache[T]) unhalt() {
for _, bucket := range c.buckets {
bucket.Unlock()
}
}
func (c *Cache[T]) worker() {
dropped := 0
cc := c.control
@@ -222,8 +243,6 @@ func (c *Cache[T]) worker() {
select {
case item := <-c.promotables:
promoteItem(item)
case item := <-c.setables:
promoteItem(item)
case item := <-c.deletables:
c.doDelete(item)
case control := <-cc:
@@ -240,11 +259,22 @@ func (c *Cache[T]) worker() {
}
msg.done <- struct{}{}
case controlClear:
for _, bucket := range c.buckets {
bucket.clear()
}
c.size = 0
c.list = NewList[*Item[T]]()
c.halted(func() {
promotables := c.promotables
for len(promotables) > 0 {
<-promotables
}
deletables := c.deletables
for len(deletables) > 0 {
<-deletables
}
for _, bucket := range c.buckets {
bucket.clear()
}
c.size = 0
c.list = NewList[*Item[T]]()
})
msg.done <- struct{}{}
case controlGetSize:
msg.res <- c.size
@@ -252,7 +282,7 @@ func (c *Cache[T]) worker() {
dropped += c.gc()
msg.done <- struct{}{}
case controlSyncUpdates:
doAllPendingPromotesAndDeletes(c.promotables, c.setables, promoteItem, c.deletables, c.doDelete)
doAllPendingPromotesAndDeletes(c.promotables, promoteItem, c.deletables, c.doDelete)
msg.done <- struct{}{}
}
}
@@ -276,7 +306,6 @@ drain:
// that were already sent by the same goroutine.
func doAllPendingPromotesAndDeletes[T any](
promotables <-chan *Item[T],
setables <-chan *Item[T],
promoteFn func(*Item[T]),
deletables <-chan *Item[T],
deleteFn func(*Item[T]),
@@ -286,8 +315,6 @@ doAllPromotes:
select {
case item := <-promotables:
promoteFn(item)
case item := <-setables:
promoteFn(item)
default:
break doAllPromotes
}

View File

@@ -4,6 +4,7 @@ import (
"math/rand"
"sort"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
@@ -11,6 +12,27 @@ import (
"github.com/karlseguin/ccache/v3/assert"
)
func Test_Setnx(t *testing.T) {
cache := New(Configure[string]())
defer cache.Stop()
assert.Equal(t, cache.ItemCount(), 0)
cache.Set("spice", "flow", time.Minute)
assert.Equal(t, cache.ItemCount(), 1)
// set if exists
cache.Setnx("spice", "worm", time.Minute)
assert.Equal(t, cache.ItemCount(), 1)
assert.Equal(t, cache.Get("spice").Value(), "flow")
// set if not exists
cache.Delete("spice")
cache.Setnx("spice", "worm", time.Minute)
assert.Equal(t, cache.Get("spice").Value(), "worm")
assert.Equal(t, cache.ItemCount(), 1)
}
func Test_CacheDeletesAValue(t *testing.T) {
cache := New(Configure[string]())
defer cache.Stop()
@@ -361,18 +383,38 @@ func Test_ConcurrentStop(t *testing.T) {
}
}
func Test_UnbufferedSetable_Enforces_MaxSize(t *testing.T) {
cache := New(Configure[string]().MaxSize(3).SetableBuffer(0).ItemsToPrune(1))
cache.Set("a", "1", time.Minute)
cache.Set("b", "2", time.Minute)
cache.Set("c", "3", time.Minute)
cache.Set("d", "4", time.Minute)
cache.Set("e", "5", time.Minute)
assert.Nil(t, cache.Get("a"))
// "b" could or could not be purged
assert.Equal(t, cache.Get("c").Value(), "3")
assert.Equal(t, cache.Get("d").Value(), "4")
assert.Equal(t, cache.Get("e").Value(), "5")
func Test_ConcurrentClearAndSet(t *testing.T) {
for i := 0; i < 100; i++ {
var stop atomic.Bool
var wg sync.WaitGroup
cache := New(Configure[string]())
r := func() {
for !stop.Load() {
cache.Set("a", "a", time.Minute)
}
wg.Done()
}
go r()
wg.Add(1)
cache.Clear()
stop.Store(true)
wg.Wait()
time.Sleep(time.Millisecond)
cache.SyncUpdates()
known := make(map[string]struct{})
for node := cache.list.Head; node != nil; node = node.Next {
known[node.Value.key] = struct{}{}
}
for _, bucket := range cache.buckets {
for key := range bucket.lookup {
_, exists := known[key]
assert.True(t, exists)
}
}
}
}
type SizedItem struct {

View File

@@ -6,7 +6,6 @@ type Configuration[T any] struct {
itemsToPrune int
deleteBuffer int
promoteBuffer int
setableBuffer int
getsPerPromote int32
tracking bool
onDelete func(item *Item[T])
@@ -22,7 +21,6 @@ func Configure[T any]() *Configuration[T] {
deleteBuffer: 1024,
getsPerPromote: 3,
promoteBuffer: 1024,
setableBuffer: 256,
maxSize: 5000,
tracking: false,
}
@@ -61,14 +59,6 @@ func (c *Configuration[T]) PromoteBuffer(size uint32) *Configuration[T] {
return c
}
// The size of the queue for items which are set. If the queue fills, sets block.
// Setting this to 0 will ensure that the queue never grows being MaxSize+1
// [256]
func (c *Configuration[T]) SetableBuffer(size uint32) *Configuration[T] {
c.setableBuffer = int(size)
return c
}
// The size of the queue for items which should be deleted. If the queue fills
// up, calls to Delete() will block
func (c *Configuration[T]) DeleteBuffer(size uint32) *Configuration[T] {

View File

@@ -55,6 +55,10 @@ func (i *Item[T]) shouldPromote(getsPerPromote int32) bool {
return i.promotions == getsPerPromote
}
func (i *Item[T]) Key() string {
return i.key
}
func (i *Item[T]) Value() T {
return i.value
}

View File

@@ -8,6 +8,11 @@ import (
"github.com/karlseguin/ccache/v3/assert"
)
func Test_Item_Key(t *testing.T) {
item := &Item[int]{key: "foo"}
assert.Equal(t, item.Key(), "foo")
}
func Test_Item_Promotability(t *testing.T) {
item := &Item[int]{promotions: 4}
assert.Equal(t, item.shouldPromote(5), true)

View File

@@ -111,9 +111,8 @@ func (b *layeredBucket[T]) forEachFunc(primary string, matches func(key string,
}
}
// we expect the caller to have acquired a write lock
func (b *layeredBucket[T]) clear() {
b.Lock()
defer b.Unlock()
for _, bucket := range b.buckets {
bucket.clear()
}

View File

@@ -16,7 +16,6 @@ type LayeredCache[T any] struct {
size int64
deletables chan *Item[T]
promotables chan *Item[T]
setables chan *Item[T]
}
// Create a new layered cache with the specified configuration.
@@ -41,7 +40,6 @@ func Layered[T any](config *Configuration[T]) *LayeredCache[T] {
buckets: make([]*layeredBucket[T], config.buckets),
deletables: make(chan *Item[T], config.deleteBuffer),
promotables: make(chan *Item[T], config.promoteBuffer),
setables: make(chan *Item[T], config.setableBuffer),
}
for i := 0; i < int(config.buckets); i++ {
c.buckets[i] = &layeredBucket[T]{
@@ -188,7 +186,7 @@ func (c *LayeredCache[T]) set(primary, secondary string, value T, duration time.
if existing != nil {
c.deletables <- existing
}
c.setables <- item
c.promote(item)
return item
}
@@ -198,6 +196,28 @@ func (c *LayeredCache[T]) bucket(key string) *layeredBucket[T] {
return c.buckets[h.Sum32()&c.bucketMask]
}
func (c *LayeredCache[T]) halted(fn func()) {
c.halt()
defer c.unhalt()
fn()
}
func (c *LayeredCache[T]) halt() {
for _, bucket := range c.buckets {
bucket.Lock()
}
}
func (c *LayeredCache[T]) unhalt() {
for _, bucket := range c.buckets {
bucket.Unlock()
}
}
func (c *LayeredCache[T]) promote(item *Item[T]) {
c.promotables <- item
}
func (c *LayeredCache[T]) worker() {
dropped := 0
cc := c.control
@@ -212,8 +232,6 @@ func (c *LayeredCache[T]) worker() {
select {
case item := <-c.promotables:
promoteItem(item)
case item := <-c.setables:
promoteItem(item)
case item := <-c.deletables:
c.doDelete(item)
case control := <-cc:
@@ -230,11 +248,22 @@ func (c *LayeredCache[T]) worker() {
}
msg.done <- struct{}{}
case controlClear:
for _, bucket := range c.buckets {
bucket.clear()
promotables := c.promotables
for len(promotables) > 0 {
<-promotables
}
c.size = 0
c.list = NewList[*Item[T]]()
deletables := c.deletables
for len(deletables) > 0 {
<-deletables
}
c.halted(func() {
for _, bucket := range c.buckets {
bucket.clear()
}
c.size = 0
c.list = NewList[*Item[T]]()
})
msg.done <- struct{}{}
case controlGetSize:
msg.res <- c.size
@@ -242,7 +271,7 @@ func (c *LayeredCache[T]) worker() {
dropped += c.gc()
msg.done <- struct{}{}
case controlSyncUpdates:
doAllPendingPromotesAndDeletes(c.promotables, c.promotables, promoteItem, c.deletables, c.doDelete)
doAllPendingPromotesAndDeletes(c.promotables, promoteItem, c.deletables, c.doDelete)
msg.done <- struct{}{}
}
}

View File

@@ -37,9 +37,9 @@ var cache = ccache.New(ccache.Configure[int]().MaxSize(1000).ItemsToPrune(100))
The most likely configuration options to tweak are:
* `MaxSize(int)` - the maximum size to store in the cache (default: 5000)
* `MaxSize(int)` - the maximum number size to store in the cache (default: 5000)
* `GetsPerPromote(int)` - the number of times an item is fetched before we promote it. For large caches with long TTLs, it normally isn't necessary to promote an item after every fetch (default: 3)
* `ItemsToPrune(int)` - the number of items to prune when we hit `MaxSize`. Freeing up more than 1 slot at a time improves performance (default: 500)
* `ItemsToPrune(int)` - the number of items to prune when we hit `MaxSize`. Freeing up more than 1 slot at a time improved performance (default: 500)
Configurations that change the internals of the cache, which aren't as likely to need tweaking:
@@ -47,9 +47,6 @@ Configurations that change the internals of the cache, which aren't as likely to
* `PromoteBuffer(int)` - the size of the buffer to use to queue promotions (default: 1024)
* `DeleteBuffer(int)` the size of the buffer to use to queue deletions (default: 1024)
## MaxSize Soft vs Hard Limit
By default, MaxSize is a soft limit. This is a result of the fact that cache pruning happens asychronously in a separate "worker" goroutine. The hard limit is MaxSize + SetableBufferSize + 1. The SettableBufferSize can be configured via the `SetableBuffer(INT)` configuration function. It defaults to 256.
## Usage
Once the cache is setup, you can `Get`, `Set` and `Delete` items from it. A `Get` returns an `*Item`:
@@ -125,6 +122,14 @@ cache.Replace("user:4", user)
`Replace` returns true if the item existed (and thus was replaced). In the case where the key was not in the cache, the value *is not* inserted and false is returned.
### Setnx
Set the value if not exists. setnx will first check whether kv exists. If it does not exist, set kv in cache. this operation is atomic.
```go
cache.Set("user:4", user, time.Minute * 10)
```
### GetDropped
You can get the number of keys evicted due to memory pressure by calling `GetDropped`:

View File

@@ -20,7 +20,7 @@ func (s *SecondaryCache[T]) Set(secondary string, value T, duration time.Duratio
if existing != nil {
s.pCache.deletables <- existing
}
s.pCache.promotables <- item
s.pCache.promote(item)
return item
}