10 Commits

Author SHA1 Message Date
Karl Seguin
62cd8cc8c3 Merge pull request #85 from rfyiamcool/feat/add_setnx
feat: add setnx (if not exists, set kv)
2023-10-22 20:19:26 +08:00
rfyiamcool
b26c342793 feat: add setnx (if not exists, set kv)
Signed-off-by: rfyiamcool <rfyiamcool@163.com>
2023-10-22 19:23:23 +08:00
rfyiamcool
dd0671989b feat: add setnx (if not exists, set kv)
Signed-off-by: rfyiamcool <rfyiamcool@163.com>
2023-10-20 10:36:04 +08:00
Karl Seguin
0f8575167d Merge pull request #84 from idsulik/added-key-method-to-item
Added Key() method to Item
2023-10-20 06:51:10 +08:00
Suleiman Dibirov
fd8f81fe86 Added Key() method to Item 2023-10-19 12:16:13 +03:00
Karl Seguin
a25552af28 Attempt to make Clear concurrency-safe
This is an attempt at fixing #81 without imposing a performance hit on the
cache's "normal" (get/set/fetch) activity. Calling "Clear" is now considerably
more expensive.
2023-04-14 15:27:39 +08:00
Karl Seguin
35052434f3 Merge pull request #78 from karlseguin/control_stop
Refactor control messages + Stop handling
2023-01-07 12:12:27 +08:00
Karl Seguin
22776be1ee Refactor control messages + Stop handling
Move the control API shared between Cache and LayeredCache into its own struct.
But keep the control logic handling separate - it requires access to the local
values, like dropped and deleteItem.

Stop is now a control message. Channels are no longer closed as part of the stop
process.
2023-01-04 10:40:19 +08:00
Karl Seguin
ece93bf87d On delete, always set promotions == -2 and node == nil
Also, item.promotions doesn't need to be loaded/stored using atomic. Once upon a
time it did. Cache was updated long ago to not use atomic operations on it, but
LayeredCache wasn't. They are both consistent now (they don't use atomic
operations).

Fixes: https://github.com/karlseguin/ccache/issues/76
2022-12-13 20:33:58 +08:00
Karl Seguin
3452e4e261 Fix memory leak
As documented in https://github.com/karlseguin/ccache/issues/76, an entry which
is both GC'd and deleted (either via a delete or an update) will result in the
internal link list having a nil tail (because removing the same node multiple times
from the linked list does that).

doDelete was already aware of "invalid" nodes (where item.node == nil), so the
solution seems to be as simple as setting item.node = nil during GC.
2022-11-19 08:15:48 +08:00
10 changed files with 448 additions and 210 deletions

View File

@@ -35,6 +35,30 @@ func (b *bucket[T]) get(key string) *Item[T] {
return b.lookup[key]
}
func (b *bucket[T]) setnx(key string, value T, duration time.Duration, track bool) *Item[T] {
b.RLock()
item := b.lookup[key]
b.RUnlock()
if item != nil {
return item
}
expires := time.Now().Add(duration).UnixNano()
newItem := newItem(key, value, expires, track)
b.Lock()
defer b.Unlock()
// check again under write lock
item = b.lookup[key]
if item != nil {
return item
}
b.lookup[key] = newItem
return newItem
}
func (b *bucket[T]) set(key string, value T, duration time.Duration, track bool) (*Item[T], *Item[T]) {
expires := time.Now().Add(duration).UnixNano()
item := newItem(key, value, expires, track)
@@ -98,8 +122,10 @@ func (b *bucket[T]) deletePrefix(prefix string, deletables chan *Item[T]) int {
}, deletables)
}
// we expect the caller to have acquired a write lock
func (b *bucket[T]) clear() {
b.Lock()
for _, item := range b.lookup {
item.promotions = -2
}
b.lookup = make(map[string]*Item[T])
b.Unlock()
}

176
cache.go
View File

@@ -36,13 +36,13 @@ type gc struct {
type Cache[T any] struct {
*Configuration[T]
control
list *List[*Item[T]]
size int64
buckets []*bucket[T]
bucketMask uint32
deletables chan *Item[T]
promotables chan *Item[T]
control chan interface{}
}
// Create a new cache with the specified configuration
@@ -51,16 +51,18 @@ func New[T any](config *Configuration[T]) *Cache[T] {
c := &Cache[T]{
list: NewList[*Item[T]](),
Configuration: config,
control: newControl(),
bucketMask: uint32(config.buckets) - 1,
buckets: make([]*bucket[T], config.buckets),
control: make(chan interface{}),
deletables: make(chan *Item[T], config.deleteBuffer),
promotables: make(chan *Item[T], config.promoteBuffer),
}
for i := 0; i < config.buckets; i++ {
c.buckets[i] = &bucket[T]{
lookup: make(map[string]*Item[T]),
}
}
c.restart()
go c.worker()
return c
}
@@ -144,6 +146,11 @@ func (c *Cache[T]) Set(key string, value T, duration time.Duration) {
c.set(key, value, duration, false)
}
// Setnx set the value in the cache for the specified duration if not exists
func (c *Cache[T]) Setnx(key string, value T, duration time.Duration) {
c.bucket(key).setnx(key, value, duration, false)
}
// Replace the value if it exists, does not set if it doesn't.
// Returns true if the item existed an was replaced, false otherwise.
// Replace does not reset item's TTL
@@ -184,94 +191,6 @@ func (c *Cache[T]) Delete(key string) bool {
return false
}
// Clears the cache
// This is a control command.
func (c *Cache[T]) Clear() {
done := make(chan struct{})
c.control <- clear{done: done}
<-done
}
// Stops the background worker. Operations performed on the cache after Stop
// is called are likely to panic
// This is a control command.
func (c *Cache[T]) Stop() {
close(c.promotables)
<-c.control
}
// Gets the number of items removed from the cache due to memory pressure since
// the last time GetDropped was called
// This is a control command.
func (c *Cache[T]) GetDropped() int {
return doGetDropped(c.control)
}
func doGetDropped(controlCh chan<- interface{}) int {
res := make(chan int)
controlCh <- getDropped{res: res}
return <-res
}
// SyncUpdates waits until the cache has finished asynchronous state updates for any operations
// that were done by the current goroutine up to now.
//
// For efficiency, the cache's implementation of LRU behavior is partly managed by a worker
// goroutine that updates its internal data structures asynchronously. This means that the
// cache's state in terms of (for instance) eviction of LRU items is only eventually consistent;
// there is no guarantee that it happens before a Get or Set call has returned. Most of the time
// application code will not care about this, but especially in a test scenario you may want to
// be able to know when the worker has caught up.
//
// This applies only to cache methods that were previously called by the same goroutine that is
// now calling SyncUpdates. If other goroutines are using the cache at the same time, there is
// no way to know whether any of them still have pending state updates when SyncUpdates returns.
// This is a control command.
func (c *Cache[T]) SyncUpdates() {
doSyncUpdates(c.control)
}
func doSyncUpdates(controlCh chan<- interface{}) {
done := make(chan struct{})
controlCh <- syncWorker{done: done}
<-done
}
// Sets a new max size. That can result in a GC being run if the new maxium size
// is smaller than the cached size
// This is a control command.
func (c *Cache[T]) SetMaxSize(size int64) {
done := make(chan struct{})
c.control <- setMaxSize{size: size, done: done}
<-done
}
// Forces GC. There should be no reason to call this function, except from tests
// which require synchronous GC.
// This is a control command.
func (c *Cache[T]) GC() {
done := make(chan struct{})
c.control <- gc{done: done}
<-done
}
// Gets the size of the cache. This is an O(1) call to make, but it is handled
// by the worker goroutine. It's meant to be called periodically for metrics, or
// from tests.
// This is a control command.
func (c *Cache[T]) GetSize() int64 {
res := make(chan int64)
c.control <- getSize{res}
return <-res
}
func (c *Cache[T]) restart() {
c.deletables = make(chan *Item[T], c.deleteBuffer)
c.promotables = make(chan *Item[T], c.promoteBuffer)
c.control = make(chan interface{})
go c.worker()
}
func (c *Cache[T]) deleteItem(bucket *bucket[T], item *Item[T]) {
bucket.delete(item.key) //stop other GETs from getting it
c.deletables <- item
@@ -292,49 +211,78 @@ func (c *Cache[T]) bucket(key string) *bucket[T] {
return c.buckets[h.Sum32()&c.bucketMask]
}
func (c *Cache[T]) halted(fn func()) {
c.halt()
defer c.unhalt()
fn()
}
func (c *Cache[T]) halt() {
for _, bucket := range c.buckets {
bucket.Lock()
}
}
func (c *Cache[T]) unhalt() {
for _, bucket := range c.buckets {
bucket.Unlock()
}
}
func (c *Cache[T]) worker() {
defer close(c.control)
dropped := 0
cc := c.control
promoteItem := func(item *Item[T]) {
if c.doPromote(item) && c.size > c.maxSize {
dropped += c.gc()
}
}
for {
select {
case item, ok := <-c.promotables:
if ok == false {
goto drain
}
case item := <-c.promotables:
promoteItem(item)
case item := <-c.deletables:
c.doDelete(item)
case control := <-c.control:
case control := <-cc:
switch msg := control.(type) {
case getDropped:
case controlStop:
goto drain
case controlGetDropped:
msg.res <- dropped
dropped = 0
case setMaxSize:
case controlSetMaxSize:
c.maxSize = msg.size
if c.size > c.maxSize {
dropped += c.gc()
}
msg.done <- struct{}{}
case clear:
for _, bucket := range c.buckets {
bucket.clear()
}
c.size = 0
c.list = NewList[*Item[T]]()
case controlClear:
c.halted(func() {
promotables := c.promotables
for len(promotables) > 0 {
<-promotables
}
deletables := c.deletables
for len(deletables) > 0 {
<-deletables
}
for _, bucket := range c.buckets {
bucket.clear()
}
c.size = 0
c.list = NewList[*Item[T]]()
})
msg.done <- struct{}{}
case getSize:
case controlGetSize:
msg.res <- c.size
case gc:
case controlGC:
dropped += c.gc()
msg.done <- struct{}{}
case syncWorker:
doAllPendingPromotesAndDeletes(c.promotables, promoteItem,
c.deletables, c.doDelete)
case controlSyncUpdates:
doAllPendingPromotesAndDeletes(c.promotables, promoteItem, c.deletables, c.doDelete)
msg.done <- struct{}{}
}
}
@@ -346,7 +294,6 @@ drain:
case item := <-c.deletables:
c.doDelete(item)
default:
close(c.deletables)
return
}
}
@@ -367,9 +314,7 @@ doAllPromotes:
for {
select {
case item := <-promotables:
if item != nil {
promoteFn(item)
}
promoteFn(item)
default:
break doAllPromotes
}
@@ -394,6 +339,8 @@ func (c *Cache[T]) doDelete(item *Item[T]) {
c.onDelete(item)
}
c.list.Remove(item.node)
item.node = nil
item.promotions = -2
}
}
@@ -438,6 +385,7 @@ func (c *Cache[T]) gc() int {
c.onDelete(item)
}
dropped += 1
item.node = nil
item.promotions = -2
}
node = prev

View File

@@ -1,8 +1,10 @@
package ccache
import (
"math/rand"
"sort"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
@@ -10,6 +12,27 @@ import (
"github.com/karlseguin/ccache/v3/assert"
)
func Test_Setnx(t *testing.T) {
cache := New(Configure[string]())
defer cache.Stop()
assert.Equal(t, cache.ItemCount(), 0)
cache.Set("spice", "flow", time.Minute)
assert.Equal(t, cache.ItemCount(), 1)
// set if exists
cache.Setnx("spice", "worm", time.Minute)
assert.Equal(t, cache.ItemCount(), 1)
assert.Equal(t, cache.Get("spice").Value(), "flow")
// set if not exists
cache.Delete("spice")
cache.Setnx("spice", "worm", time.Minute)
assert.Equal(t, cache.Get("spice").Value(), "worm")
assert.Equal(t, cache.ItemCount(), 1)
}
func Test_CacheDeletesAValue(t *testing.T) {
cache := New(Configure[string]())
defer cache.Stop()
@@ -313,6 +336,87 @@ func Test_CacheForEachFunc(t *testing.T) {
assert.DoesNotContain(t, forEachKeys(cache), "stop")
}
func Test_CachePrune(t *testing.T) {
maxSize := int64(500)
cache := New(Configure[string]().MaxSize(maxSize).ItemsToPrune(50))
epoch := 0
for i := 0; i < 10000; i++ {
epoch += 1
expired := make([]string, 0)
for i := 0; i < 50; i += 1 {
key := strconv.FormatInt(rand.Int63n(maxSize*20), 10)
item := cache.Get(key)
if item == nil || item.TTL() > 1*time.Minute {
expired = append(expired, key)
}
}
for _, key := range expired {
cache.Set(key, key, 5*time.Minute)
}
if epoch%500 == 0 {
assert.True(t, cache.GetSize() <= 500)
}
}
}
func Test_ConcurrentStop(t *testing.T) {
for i := 0; i < 100; i++ {
cache := New(Configure[string]())
r := func() {
for {
key := strconv.Itoa(int(rand.Int31n(100)))
switch rand.Int31n(3) {
case 0:
cache.Get(key)
case 1:
cache.Set(key, key, time.Minute)
case 2:
cache.Delete(key)
}
}
}
go r()
go r()
go r()
time.Sleep(time.Millisecond * 10)
cache.Stop()
}
}
func Test_ConcurrentClearAndSet(t *testing.T) {
for i := 0; i < 100; i++ {
var stop atomic.Bool
var wg sync.WaitGroup
cache := New(Configure[string]())
r := func() {
for !stop.Load() {
cache.Set("a", "a", time.Minute)
}
wg.Done()
}
go r()
wg.Add(1)
cache.Clear()
stop.Store(true)
wg.Wait()
time.Sleep(time.Millisecond)
cache.SyncUpdates()
known := make(map[string]struct{})
for node := cache.list.Head; node != nil; node = node.Next {
known[node.Value.key] = struct{}{}
}
for _, bucket := range cache.buckets {
for key := range bucket.lookup {
_, exists := known[key]
assert.True(t, exists)
}
}
}
}
type SizedItem struct {
id int
s int64

110
control.go Normal file
View File

@@ -0,0 +1,110 @@
package ccache
type controlGC struct {
done chan struct{}
}
type controlClear struct {
done chan struct{}
}
type controlStop struct {
}
type controlGetSize struct {
res chan int64
}
type controlGetDropped struct {
res chan int
}
type controlSetMaxSize struct {
size int64
done chan struct{}
}
type controlSyncUpdates struct {
done chan struct{}
}
type control chan interface{}
func newControl() chan interface{} {
return make(chan interface{}, 5)
}
// Forces GC. There should be no reason to call this function, except from tests
// which require synchronous GC.
// This is a control command.
func (c control) GC() {
done := make(chan struct{})
c <- controlGC{done: done}
<-done
}
// Sends a stop signal to the worker thread. The worker thread will shut down
// 5 seconds after the last message is received. The cache should not be used
// after Stop is called, but concurrently executing requests should properly finish
// executing.
// This is a control command.
func (c control) Stop() {
c.SyncUpdates()
c <- controlStop{}
}
// Clears the cache
// This is a control command.
func (c control) Clear() {
done := make(chan struct{})
c <- controlClear{done: done}
<-done
}
// Gets the size of the cache. This is an O(1) call to make, but it is handled
// by the worker goroutine. It's meant to be called periodically for metrics, or
// from tests.
// This is a control command.
func (c control) GetSize() int64 {
res := make(chan int64)
c <- controlGetSize{res: res}
return <-res
}
// Gets the number of items removed from the cache due to memory pressure since
// the last time GetDropped was called
// This is a control command.
func (c control) GetDropped() int {
res := make(chan int)
c <- controlGetDropped{res: res}
return <-res
}
// Sets a new max size. That can result in a GC being run if the new maxium size
// is smaller than the cached size
// This is a control command.
func (c control) SetMaxSize(size int64) {
done := make(chan struct{})
c <- controlSetMaxSize{size: size, done: done}
<-done
}
// SyncUpdates waits until the cache has finished asynchronous state updates for any operations
// that were done by the current goroutine up to now.
//
// For efficiency, the cache's implementation of LRU behavior is partly managed by a worker
// goroutine that updates its internal data structures asynchronously. This means that the
// cache's state in terms of (for instance) eviction of LRU items is only eventually consistent;
// there is no guarantee that it happens before a Get or Set call has returned. Most of the time
// application code will not care about this, but especially in a test scenario you may want to
// be able to know when the worker has caught up.
//
// This applies only to cache methods that were previously called by the same goroutine that is
// now calling SyncUpdates. If other goroutines are using the cache at the same time, there is
// no way to know whether any of them still have pending state updates when SyncUpdates returns.
// This is a control command.
func (c control) SyncUpdates() {
done := make(chan struct{})
c <- controlSyncUpdates{done: done}
<-done
}

View File

@@ -55,6 +55,10 @@ func (i *Item[T]) shouldPromote(getsPerPromote int32) bool {
return i.promotions == getsPerPromote
}
func (i *Item[T]) Key() string {
return i.key
}
func (i *Item[T]) Value() T {
return i.value
}

View File

@@ -8,6 +8,11 @@ import (
"github.com/karlseguin/ccache/v3/assert"
)
func Test_Item_Key(t *testing.T) {
item := &Item[int]{key: "foo"}
assert.Equal(t, item.Key(), "foo")
}
func Test_Item_Promotability(t *testing.T) {
item := &Item[int]{promotions: 4}
assert.Equal(t, item.shouldPromote(5), true)

View File

@@ -111,9 +111,8 @@ func (b *layeredBucket[T]) forEachFunc(primary string, matches func(key string,
}
}
// we expect the caller to have acquired a write lock
func (b *layeredBucket[T]) clear() {
b.Lock()
defer b.Unlock()
for _, bucket := range b.buckets {
bucket.clear()
}

View File

@@ -9,13 +9,13 @@ import (
type LayeredCache[T any] struct {
*Configuration[T]
control
list *List[*Item[T]]
buckets []*layeredBucket[T]
bucketMask uint32
size int64
deletables chan *Item[T]
promotables chan *Item[T]
control chan interface{}
}
// Create a new layered cache with the specified configuration.
@@ -35,17 +35,18 @@ func Layered[T any](config *Configuration[T]) *LayeredCache[T] {
c := &LayeredCache[T]{
list: NewList[*Item[T]](),
Configuration: config,
control: newControl(),
bucketMask: uint32(config.buckets) - 1,
buckets: make([]*layeredBucket[T], config.buckets),
deletables: make(chan *Item[T], config.deleteBuffer),
control: make(chan interface{}),
promotables: make(chan *Item[T], config.promoteBuffer),
}
for i := 0; i < int(config.buckets); i++ {
c.buckets[i] = &layeredBucket[T]{
buckets: make(map[string]*bucket[T]),
}
}
c.restart()
go c.worker()
return c
}
@@ -180,63 +181,6 @@ func (c *LayeredCache[T]) DeleteFunc(primary string, matches func(key string, it
return c.bucket(primary).deleteFunc(primary, matches, c.deletables)
}
// Clears the cache
func (c *LayeredCache[T]) Clear() {
done := make(chan struct{})
c.control <- clear{done: done}
<-done
}
func (c *LayeredCache[T]) Stop() {
close(c.promotables)
<-c.control
}
// Gets the number of items removed from the cache due to memory pressure since
// the last time GetDropped was called
func (c *LayeredCache[T]) GetDropped() int {
return doGetDropped(c.control)
}
// SyncUpdates waits until the cache has finished asynchronous state updates for any operations
// that were done by the current goroutine up to now. See Cache.SyncUpdates for details.
func (c *LayeredCache[T]) SyncUpdates() {
doSyncUpdates(c.control)
}
// Sets a new max size. That can result in a GC being run if the new maxium size
// is smaller than the cached size
func (c *LayeredCache[T]) SetMaxSize(size int64) {
done := make(chan struct{})
c.control <- setMaxSize{size: size, done: done}
<-done
}
// Forces GC. There should be no reason to call this function, except from tests
// which require synchronous GC.
// This is a control command.
func (c *LayeredCache[T]) GC() {
done := make(chan struct{})
c.control <- gc{done: done}
<-done
}
// Gets the size of the cache. This is an O(1) call to make, but it is handled
// by the worker goroutine. It's meant to be called periodically for metrics, or
// from tests.
// This is a control command.
func (c *LayeredCache[T]) GetSize() int64 {
res := make(chan int64)
c.control <- getSize{res}
return <-res
}
func (c *LayeredCache[T]) restart() {
c.promotables = make(chan *Item[T], c.promoteBuffer)
c.control = make(chan interface{})
go c.worker()
}
func (c *LayeredCache[T]) set(primary, secondary string, value T, duration time.Duration, track bool) *Item[T] {
item, existing := c.bucket(primary).set(primary, secondary, value, duration, track)
if existing != nil {
@@ -252,79 +196,121 @@ func (c *LayeredCache[T]) bucket(key string) *layeredBucket[T] {
return c.buckets[h.Sum32()&c.bucketMask]
}
func (c *LayeredCache[T]) halted(fn func()) {
c.halt()
defer c.unhalt()
fn()
}
func (c *LayeredCache[T]) halt() {
for _, bucket := range c.buckets {
bucket.Lock()
}
}
func (c *LayeredCache[T]) unhalt() {
for _, bucket := range c.buckets {
bucket.Unlock()
}
}
func (c *LayeredCache[T]) promote(item *Item[T]) {
c.promotables <- item
}
func (c *LayeredCache[T]) worker() {
defer close(c.control)
dropped := 0
cc := c.control
promoteItem := func(item *Item[T]) {
if c.doPromote(item) && c.size > c.maxSize {
dropped += c.gc()
}
}
deleteItem := func(item *Item[T]) {
if item.node == nil {
atomic.StoreInt32(&item.promotions, -2)
} else {
c.size -= item.size
if c.onDelete != nil {
c.onDelete(item)
}
c.list.Remove(item.node)
}
}
for {
select {
case item, ok := <-c.promotables:
if ok == false {
return
}
case item := <-c.promotables:
promoteItem(item)
case item := <-c.deletables:
deleteItem(item)
case control := <-c.control:
c.doDelete(item)
case control := <-cc:
switch msg := control.(type) {
case getDropped:
case controlStop:
goto drain
case controlGetDropped:
msg.res <- dropped
dropped = 0
case setMaxSize:
case controlSetMaxSize:
c.maxSize = msg.size
if c.size > c.maxSize {
dropped += c.gc()
}
msg.done <- struct{}{}
case clear:
for _, bucket := range c.buckets {
bucket.clear()
case controlClear:
promotables := c.promotables
for len(promotables) > 0 {
<-promotables
}
c.size = 0
c.list = NewList[*Item[T]]()
deletables := c.deletables
for len(deletables) > 0 {
<-deletables
}
c.halted(func() {
for _, bucket := range c.buckets {
bucket.clear()
}
c.size = 0
c.list = NewList[*Item[T]]()
})
msg.done <- struct{}{}
case getSize:
case controlGetSize:
msg.res <- c.size
case gc:
case controlGC:
dropped += c.gc()
msg.done <- struct{}{}
case syncWorker:
doAllPendingPromotesAndDeletes(c.promotables, promoteItem,
c.deletables, deleteItem)
case controlSyncUpdates:
doAllPendingPromotesAndDeletes(c.promotables, promoteItem, c.deletables, c.doDelete)
msg.done <- struct{}{}
}
}
}
drain:
for {
select {
case item := <-c.deletables:
c.doDelete(item)
default:
return
}
}
}
func (c *LayeredCache[T]) doDelete(item *Item[T]) {
if item.node == nil {
item.promotions = -2
} else {
c.size -= item.size
if c.onDelete != nil {
c.onDelete(item)
}
c.list.Remove(item.node)
item.node = nil
item.promotions = -2
}
}
func (c *LayeredCache[T]) doPromote(item *Item[T]) bool {
// deleted before it ever got promoted
if atomic.LoadInt32(&item.promotions) == -2 {
if item.promotions == -2 {
return false
}
if item.node != nil { //not a new item
if item.shouldPromote(c.getsPerPromote) {
c.list.MoveToFront(item.node)
atomic.StoreInt32(&item.promotions, 0)
item.promotions = 0
}
return false
}
@@ -355,6 +341,7 @@ func (c *LayeredCache[T]) gc() int {
if c.onDelete != nil {
c.onDelete(item)
}
item.node = nil
item.promotions = -2
dropped += 1
}

View File

@@ -1,6 +1,7 @@
package ccache
import (
"math/rand"
"sort"
"strconv"
"sync/atomic"
@@ -372,6 +373,52 @@ func Test_LayeredCache_EachFunc(t *testing.T) {
assert.DoesNotContain(t, forEachKeysLayered[int](cache, "1"), "stop")
}
func Test_LayeredCachePrune(t *testing.T) {
maxSize := int64(500)
cache := Layered(Configure[string]().MaxSize(maxSize).ItemsToPrune(50))
epoch := 0
for i := 0; i < 10000; i++ {
epoch += 1
expired := make([]string, 0)
for i := 0; i < 50; i += 1 {
key := strconv.FormatInt(rand.Int63n(maxSize*20), 10)
item := cache.Get(key, key)
if item == nil || item.TTL() > 1*time.Minute {
expired = append(expired, key)
}
}
for _, key := range expired {
cache.Set(key, key, key, 5*time.Minute)
}
if epoch%500 == 0 {
assert.True(t, cache.GetSize() <= 500)
}
}
}
func Test_LayeredConcurrentStop(t *testing.T) {
for i := 0; i < 100; i++ {
cache := Layered(Configure[string]())
r := func() {
for {
key := strconv.Itoa(int(rand.Int31n(100)))
switch rand.Int31n(3) {
case 0:
cache.Get(key, key)
case 1:
cache.Set(key, key, key, time.Minute)
case 2:
cache.Delete(key, key)
}
}
}
go r()
go r()
go r()
time.Sleep(time.Millisecond * 10)
cache.Stop()
}
}
func newLayered[T any]() *LayeredCache[T] {
c := Layered[T](Configure[T]())
c.Clear()

View File

@@ -122,6 +122,14 @@ cache.Replace("user:4", user)
`Replace` returns true if the item existed (and thus was replaced). In the case where the key was not in the cache, the value *is not* inserted and false is returned.
### Setnx
Set the value if not exists. setnx will first check whether kv exists. If it does not exist, set kv in cache. this operation is atomic.
```go
cache.Set("user:4", user, time.Minute * 10)
```
### GetDropped
You can get the number of keys evicted due to memory pressure by calling `GetDropped`: