6 Commits

Author SHA1 Message Date
Karl Seguin
62cd8cc8c3 Merge pull request #85 from rfyiamcool/feat/add_setnx
feat: add setnx (if not exists, set kv)
2023-10-22 20:19:26 +08:00
rfyiamcool
b26c342793 feat: add setnx (if not exists, set kv)
Signed-off-by: rfyiamcool <rfyiamcool@163.com>
2023-10-22 19:23:23 +08:00
rfyiamcool
dd0671989b feat: add setnx (if not exists, set kv)
Signed-off-by: rfyiamcool <rfyiamcool@163.com>
2023-10-20 10:36:04 +08:00
Karl Seguin
0f8575167d Merge pull request #84 from idsulik/added-key-method-to-item
Added Key() method to Item
2023-10-20 06:51:10 +08:00
Suleiman Dibirov
fd8f81fe86 Added Key() method to Item 2023-10-19 12:16:13 +03:00
Karl Seguin
a25552af28 Attempt to make Clear concurrency-safe
This is an attempt at fixing #81 without imposing a performance hit on the
cache's "normal" (get/set/fetch) activity. Calling "Clear" is now considerably
more expensive.
2023-04-14 15:27:39 +08:00
8 changed files with 174 additions and 13 deletions

View File

@@ -35,6 +35,30 @@ func (b *bucket[T]) get(key string) *Item[T] {
return b.lookup[key]
}
func (b *bucket[T]) setnx(key string, value T, duration time.Duration, track bool) *Item[T] {
b.RLock()
item := b.lookup[key]
b.RUnlock()
if item != nil {
return item
}
expires := time.Now().Add(duration).UnixNano()
newItem := newItem(key, value, expires, track)
b.Lock()
defer b.Unlock()
// check again under write lock
item = b.lookup[key]
if item != nil {
return item
}
b.lookup[key] = newItem
return newItem
}
func (b *bucket[T]) set(key string, value T, duration time.Duration, track bool) (*Item[T], *Item[T]) {
expires := time.Now().Add(duration).UnixNano()
item := newItem(key, value, expires, track)
@@ -98,8 +122,10 @@ func (b *bucket[T]) deletePrefix(prefix string, deletables chan *Item[T]) int {
}, deletables)
}
// we expect the caller to have acquired a write lock
func (b *bucket[T]) clear() {
b.Lock()
for _, item := range b.lookup {
item.promotions = -2
}
b.lookup = make(map[string]*Item[T])
b.Unlock()
}

View File

@@ -146,6 +146,11 @@ func (c *Cache[T]) Set(key string, value T, duration time.Duration) {
c.set(key, value, duration, false)
}
// Setnx set the value in the cache for the specified duration if not exists
func (c *Cache[T]) Setnx(key string, value T, duration time.Duration) {
c.bucket(key).setnx(key, value, duration, false)
}
// Replace the value if it exists, does not set if it doesn't.
// Returns true if the item existed an was replaced, false otherwise.
// Replace does not reset item's TTL
@@ -206,6 +211,24 @@ func (c *Cache[T]) bucket(key string) *bucket[T] {
return c.buckets[h.Sum32()&c.bucketMask]
}
func (c *Cache[T]) halted(fn func()) {
c.halt()
defer c.unhalt()
fn()
}
func (c *Cache[T]) halt() {
for _, bucket := range c.buckets {
bucket.Lock()
}
}
func (c *Cache[T]) unhalt() {
for _, bucket := range c.buckets {
bucket.Unlock()
}
}
func (c *Cache[T]) worker() {
dropped := 0
cc := c.control
@@ -236,11 +259,22 @@ func (c *Cache[T]) worker() {
}
msg.done <- struct{}{}
case controlClear:
for _, bucket := range c.buckets {
bucket.clear()
}
c.size = 0
c.list = NewList[*Item[T]]()
c.halted(func() {
promotables := c.promotables
for len(promotables) > 0 {
<-promotables
}
deletables := c.deletables
for len(deletables) > 0 {
<-deletables
}
for _, bucket := range c.buckets {
bucket.clear()
}
c.size = 0
c.list = NewList[*Item[T]]()
})
msg.done <- struct{}{}
case controlGetSize:
msg.res <- c.size

View File

@@ -4,6 +4,7 @@ import (
"math/rand"
"sort"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
@@ -11,6 +12,27 @@ import (
"github.com/karlseguin/ccache/v3/assert"
)
func Test_Setnx(t *testing.T) {
cache := New(Configure[string]())
defer cache.Stop()
assert.Equal(t, cache.ItemCount(), 0)
cache.Set("spice", "flow", time.Minute)
assert.Equal(t, cache.ItemCount(), 1)
// set if exists
cache.Setnx("spice", "worm", time.Minute)
assert.Equal(t, cache.ItemCount(), 1)
assert.Equal(t, cache.Get("spice").Value(), "flow")
// set if not exists
cache.Delete("spice")
cache.Setnx("spice", "worm", time.Minute)
assert.Equal(t, cache.Get("spice").Value(), "worm")
assert.Equal(t, cache.ItemCount(), 1)
}
func Test_CacheDeletesAValue(t *testing.T) {
cache := New(Configure[string]())
defer cache.Stop()
@@ -361,6 +383,40 @@ func Test_ConcurrentStop(t *testing.T) {
}
}
func Test_ConcurrentClearAndSet(t *testing.T) {
for i := 0; i < 100; i++ {
var stop atomic.Bool
var wg sync.WaitGroup
cache := New(Configure[string]())
r := func() {
for !stop.Load() {
cache.Set("a", "a", time.Minute)
}
wg.Done()
}
go r()
wg.Add(1)
cache.Clear()
stop.Store(true)
wg.Wait()
time.Sleep(time.Millisecond)
cache.SyncUpdates()
known := make(map[string]struct{})
for node := cache.list.Head; node != nil; node = node.Next {
known[node.Value.key] = struct{}{}
}
for _, bucket := range cache.buckets {
for key := range bucket.lookup {
_, exists := known[key]
assert.True(t, exists)
}
}
}
}
type SizedItem struct {
id int
s int64

View File

@@ -55,6 +55,10 @@ func (i *Item[T]) shouldPromote(getsPerPromote int32) bool {
return i.promotions == getsPerPromote
}
func (i *Item[T]) Key() string {
return i.key
}
func (i *Item[T]) Value() T {
return i.value
}

View File

@@ -8,6 +8,11 @@ import (
"github.com/karlseguin/ccache/v3/assert"
)
func Test_Item_Key(t *testing.T) {
item := &Item[int]{key: "foo"}
assert.Equal(t, item.Key(), "foo")
}
func Test_Item_Promotability(t *testing.T) {
item := &Item[int]{promotions: 4}
assert.Equal(t, item.shouldPromote(5), true)

View File

@@ -111,9 +111,8 @@ func (b *layeredBucket[T]) forEachFunc(primary string, matches func(key string,
}
}
// we expect the caller to have acquired a write lock
func (b *layeredBucket[T]) clear() {
b.Lock()
defer b.Unlock()
for _, bucket := range b.buckets {
bucket.clear()
}

View File

@@ -196,6 +196,24 @@ func (c *LayeredCache[T]) bucket(key string) *layeredBucket[T] {
return c.buckets[h.Sum32()&c.bucketMask]
}
func (c *LayeredCache[T]) halted(fn func()) {
c.halt()
defer c.unhalt()
fn()
}
func (c *LayeredCache[T]) halt() {
for _, bucket := range c.buckets {
bucket.Lock()
}
}
func (c *LayeredCache[T]) unhalt() {
for _, bucket := range c.buckets {
bucket.Unlock()
}
}
func (c *LayeredCache[T]) promote(item *Item[T]) {
c.promotables <- item
}
@@ -230,11 +248,22 @@ func (c *LayeredCache[T]) worker() {
}
msg.done <- struct{}{}
case controlClear:
for _, bucket := range c.buckets {
bucket.clear()
promotables := c.promotables
for len(promotables) > 0 {
<-promotables
}
c.size = 0
c.list = NewList[*Item[T]]()
deletables := c.deletables
for len(deletables) > 0 {
<-deletables
}
c.halted(func() {
for _, bucket := range c.buckets {
bucket.clear()
}
c.size = 0
c.list = NewList[*Item[T]]()
})
msg.done <- struct{}{}
case controlGetSize:
msg.res <- c.size

View File

@@ -122,6 +122,14 @@ cache.Replace("user:4", user)
`Replace` returns true if the item existed (and thus was replaced). In the case where the key was not in the cache, the value *is not* inserted and false is returned.
### Setnx
Set the value if not exists. setnx will first check whether kv exists. If it does not exist, set kv in cache. this operation is atomic.
```go
cache.Set("user:4", user, time.Minute * 10)
```
### GetDropped
You can get the number of keys evicted due to memory pressure by calling `GetDropped`: