Attempt to make Clear concurrency-safe
This is an attempt at fixing #81 without imposing a performance hit on the cache's "normal" (get/set/fetch) activity. Calling "Clear" is now considerably more expensive.
This commit is contained in:
@@ -98,8 +98,10 @@ func (b *bucket[T]) deletePrefix(prefix string, deletables chan *Item[T]) int {
|
|||||||
}, deletables)
|
}, deletables)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// we expect the caller to have acquired a write lock
|
||||||
func (b *bucket[T]) clear() {
|
func (b *bucket[T]) clear() {
|
||||||
b.Lock()
|
for _, item := range b.lookup {
|
||||||
|
item.promotions = -2
|
||||||
|
}
|
||||||
b.lookup = make(map[string]*Item[T])
|
b.lookup = make(map[string]*Item[T])
|
||||||
b.Unlock()
|
|
||||||
}
|
}
|
||||||
|
39
cache.go
39
cache.go
@@ -206,6 +206,24 @@ func (c *Cache[T]) bucket(key string) *bucket[T] {
|
|||||||
return c.buckets[h.Sum32()&c.bucketMask]
|
return c.buckets[h.Sum32()&c.bucketMask]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Cache[T]) halted(fn func()) {
|
||||||
|
c.halt()
|
||||||
|
defer c.unhalt()
|
||||||
|
fn()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Cache[T]) halt() {
|
||||||
|
for _, bucket := range c.buckets {
|
||||||
|
bucket.Lock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Cache[T]) unhalt() {
|
||||||
|
for _, bucket := range c.buckets {
|
||||||
|
bucket.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Cache[T]) worker() {
|
func (c *Cache[T]) worker() {
|
||||||
dropped := 0
|
dropped := 0
|
||||||
cc := c.control
|
cc := c.control
|
||||||
@@ -236,11 +254,22 @@ func (c *Cache[T]) worker() {
|
|||||||
}
|
}
|
||||||
msg.done <- struct{}{}
|
msg.done <- struct{}{}
|
||||||
case controlClear:
|
case controlClear:
|
||||||
for _, bucket := range c.buckets {
|
c.halted(func() {
|
||||||
bucket.clear()
|
promotables := c.promotables
|
||||||
}
|
for len(promotables) > 0 {
|
||||||
c.size = 0
|
<-promotables
|
||||||
c.list = NewList[*Item[T]]()
|
}
|
||||||
|
deletables := c.deletables
|
||||||
|
for len(deletables) > 0 {
|
||||||
|
<-deletables
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, bucket := range c.buckets {
|
||||||
|
bucket.clear()
|
||||||
|
}
|
||||||
|
c.size = 0
|
||||||
|
c.list = NewList[*Item[T]]()
|
||||||
|
})
|
||||||
msg.done <- struct{}{}
|
msg.done <- struct{}{}
|
||||||
case controlGetSize:
|
case controlGetSize:
|
||||||
msg.res <- c.size
|
msg.res <- c.size
|
||||||
|
@@ -4,6 +4,7 @@ import (
|
|||||||
"math/rand"
|
"math/rand"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@@ -361,6 +362,40 @@ func Test_ConcurrentStop(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Test_ConcurrentClearAndSet(t *testing.T) {
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
var stop atomic.Bool
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
cache := New(Configure[string]())
|
||||||
|
r := func() {
|
||||||
|
for !stop.Load() {
|
||||||
|
cache.Set("a", "a", time.Minute)
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}
|
||||||
|
go r()
|
||||||
|
wg.Add(1)
|
||||||
|
cache.Clear()
|
||||||
|
stop.Store(true)
|
||||||
|
wg.Wait()
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
cache.SyncUpdates()
|
||||||
|
|
||||||
|
known := make(map[string]struct{})
|
||||||
|
for node := cache.list.Head; node != nil; node = node.Next {
|
||||||
|
known[node.Value.key] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, bucket := range cache.buckets {
|
||||||
|
for key := range bucket.lookup {
|
||||||
|
_, exists := known[key]
|
||||||
|
assert.True(t, exists)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type SizedItem struct {
|
type SizedItem struct {
|
||||||
id int
|
id int
|
||||||
s int64
|
s int64
|
||||||
|
@@ -111,9 +111,8 @@ func (b *layeredBucket[T]) forEachFunc(primary string, matches func(key string,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// we expect the caller to have acquired a write lock
|
||||||
func (b *layeredBucket[T]) clear() {
|
func (b *layeredBucket[T]) clear() {
|
||||||
b.Lock()
|
|
||||||
defer b.Unlock()
|
|
||||||
for _, bucket := range b.buckets {
|
for _, bucket := range b.buckets {
|
||||||
bucket.clear()
|
bucket.clear()
|
||||||
}
|
}
|
||||||
|
@@ -196,6 +196,24 @@ func (c *LayeredCache[T]) bucket(key string) *layeredBucket[T] {
|
|||||||
return c.buckets[h.Sum32()&c.bucketMask]
|
return c.buckets[h.Sum32()&c.bucketMask]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *LayeredCache[T]) halted(fn func()) {
|
||||||
|
c.halt()
|
||||||
|
defer c.unhalt()
|
||||||
|
fn()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *LayeredCache[T]) halt() {
|
||||||
|
for _, bucket := range c.buckets {
|
||||||
|
bucket.Lock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *LayeredCache[T]) unhalt() {
|
||||||
|
for _, bucket := range c.buckets {
|
||||||
|
bucket.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (c *LayeredCache[T]) promote(item *Item[T]) {
|
func (c *LayeredCache[T]) promote(item *Item[T]) {
|
||||||
c.promotables <- item
|
c.promotables <- item
|
||||||
}
|
}
|
||||||
@@ -230,11 +248,22 @@ func (c *LayeredCache[T]) worker() {
|
|||||||
}
|
}
|
||||||
msg.done <- struct{}{}
|
msg.done <- struct{}{}
|
||||||
case controlClear:
|
case controlClear:
|
||||||
for _, bucket := range c.buckets {
|
promotables := c.promotables
|
||||||
bucket.clear()
|
for len(promotables) > 0 {
|
||||||
|
<-promotables
|
||||||
}
|
}
|
||||||
c.size = 0
|
deletables := c.deletables
|
||||||
c.list = NewList[*Item[T]]()
|
for len(deletables) > 0 {
|
||||||
|
<-deletables
|
||||||
|
}
|
||||||
|
|
||||||
|
c.halted(func() {
|
||||||
|
for _, bucket := range c.buckets {
|
||||||
|
bucket.clear()
|
||||||
|
}
|
||||||
|
c.size = 0
|
||||||
|
c.list = NewList[*Item[T]]()
|
||||||
|
})
|
||||||
msg.done <- struct{}{}
|
msg.done <- struct{}{}
|
||||||
case controlGetSize:
|
case controlGetSize:
|
||||||
msg.res <- c.size
|
msg.res <- c.size
|
||||||
|
Reference in New Issue
Block a user