Ability to dynamically SetMaxSize

To support this, rather than adding another field/channel like
`getDroppedReq`, I added a `control` channel that can be used for these
miscellaneous interactions with the worker. The control can also be
used to take over for the `donec` channel
This commit is contained in:
Karl Seguin
2020-06-26 20:22:30 +08:00
parent d9aec58960
commit 40275a30c8
4 changed files with 135 additions and 38 deletions

View File

@@ -8,17 +8,24 @@ import (
"time" "time"
) )
// The cache has a generic 'control' channel that is used to send
// messages to the worker. These are the messages that can be sent to it
type getDropped struct {
res chan int
}
type setMaxSize struct {
size int64
}
type Cache struct { type Cache struct {
*Configuration *Configuration
list *list.List list *list.List
size int64 size int64
buckets []*bucket buckets []*bucket
bucketMask uint32 bucketMask uint32
deletables chan *Item deletables chan *Item
promotables chan *Item promotables chan *Item
donec chan struct{} control chan interface{}
getDroppedReq chan struct{}
getDroppedRes chan int
} }
// Create a new cache with the specified configuration // Create a new cache with the specified configuration
@@ -29,8 +36,7 @@ func New(config *Configuration) *Cache {
Configuration: config, Configuration: config,
bucketMask: uint32(config.buckets) - 1, bucketMask: uint32(config.buckets) - 1,
buckets: make([]*bucket, config.buckets), buckets: make([]*bucket, config.buckets),
getDroppedReq: make(chan struct{}), control: make(chan interface{}),
getDroppedRes: make(chan int),
} }
for i := 0; i < int(config.buckets); i++ { for i := 0; i < int(config.buckets); i++ {
c.buckets[i] = &bucket{ c.buckets[i] = &bucket{
@@ -138,20 +144,27 @@ func (c *Cache) Clear() {
// is called are likely to panic // is called are likely to panic
func (c *Cache) Stop() { func (c *Cache) Stop() {
close(c.promotables) close(c.promotables)
<-c.donec <-c.control
} }
// Gets the number of items removed from the cache due to memory pressure since // Gets the number of items removed from the cache due to memory pressure since
// the last time GetDropped was called // the last time GetDropped was called
func (c *Cache) GetDropped() int { func (c *Cache) GetDropped() int {
c.getDroppedReq <- struct{}{} res := make(chan int)
return <-c.getDroppedRes c.control <- getDropped{res: res}
return <-res
}
// Sets a new max size. That can result in a GC being run if the new maxium size
// is smaller than the cached size
func (c *Cache) SetMaxSize(size int64) {
c.control <- setMaxSize{size}
} }
func (c *Cache) restart() { func (c *Cache) restart() {
c.deletables = make(chan *Item, c.deleteBuffer) c.deletables = make(chan *Item, c.deleteBuffer)
c.promotables = make(chan *Item, c.promoteBuffer) c.promotables = make(chan *Item, c.promoteBuffer)
c.donec = make(chan struct{}) c.control = make(chan interface{})
go c.worker() go c.worker()
} }
@@ -180,7 +193,7 @@ func (c *Cache) promote(item *Item) {
} }
func (c *Cache) worker() { func (c *Cache) worker() {
defer close(c.donec) defer close(c.control)
dropped := 0 dropped := 0
for { for {
select { select {
@@ -193,9 +206,17 @@ func (c *Cache) worker() {
} }
case item := <-c.deletables: case item := <-c.deletables:
c.doDelete(item) c.doDelete(item)
case _ = <-c.getDroppedReq: case control := <-c.control:
c.getDroppedRes <- dropped switch msg := control.(type) {
dropped = 0 case getDropped:
msg.res <- dropped
dropped = 0
case setMaxSize:
c.maxSize = msg.size
if c.size > c.maxSize {
dropped += c.gc()
}
}
} }
} }

View File

@@ -208,6 +208,38 @@ func (_ CacheTests) ReplaceChangesSize() {
checkSize(cache, 5) checkSize(cache, 5)
} }
func (_ CacheTests) ResizeOnTheFly() {
cache := New(Configure().MaxSize(9).ItemsToPrune(1))
for i := 0; i < 5; i++ {
cache.Set(strconv.Itoa(i), i, time.Minute)
}
cache.SetMaxSize(3)
time.Sleep(time.Millisecond * 10)
Expect(cache.GetDropped()).To.Equal(2)
Expect(cache.Get("0")).To.Equal(nil)
Expect(cache.Get("1")).To.Equal(nil)
Expect(cache.Get("2").Value()).To.Equal(2)
Expect(cache.Get("3").Value()).To.Equal(3)
Expect(cache.Get("4").Value()).To.Equal(4)
cache.Set("5", 5, time.Minute)
time.Sleep(time.Millisecond * 5)
Expect(cache.GetDropped()).To.Equal(1)
Expect(cache.Get("2")).To.Equal(nil)
Expect(cache.Get("3").Value()).To.Equal(3)
Expect(cache.Get("4").Value()).To.Equal(4)
Expect(cache.Get("5").Value()).To.Equal(5)
cache.SetMaxSize(10)
cache.Set("6", 6, time.Minute)
time.Sleep(time.Millisecond * 10)
Expect(cache.GetDropped()).To.Equal(0)
Expect(cache.Get("3").Value()).To.Equal(3)
Expect(cache.Get("4").Value()).To.Equal(4)
Expect(cache.Get("5").Value()).To.Equal(5)
Expect(cache.Get("6").Value()).To.Equal(6)
}
type SizedItem struct { type SizedItem struct {
id int id int
s int64 s int64

View File

@@ -10,15 +10,13 @@ import (
type LayeredCache struct { type LayeredCache struct {
*Configuration *Configuration
list *list.List list *list.List
buckets []*layeredBucket buckets []*layeredBucket
bucketMask uint32 bucketMask uint32
size int64 size int64
deletables chan *Item deletables chan *Item
promotables chan *Item promotables chan *Item
donec chan struct{} control chan interface{}
getDroppedReq chan struct{}
getDroppedRes chan int
} }
// Create a new layered cache with the specified configuration. // Create a new layered cache with the specified configuration.
@@ -41,8 +39,7 @@ func Layered(config *Configuration) *LayeredCache {
bucketMask: uint32(config.buckets) - 1, bucketMask: uint32(config.buckets) - 1,
buckets: make([]*layeredBucket, config.buckets), buckets: make([]*layeredBucket, config.buckets),
deletables: make(chan *Item, config.deleteBuffer), deletables: make(chan *Item, config.deleteBuffer),
getDroppedReq: make(chan struct{}), control: make(chan interface{}),
getDroppedRes: make(chan int),
} }
for i := 0; i < int(config.buckets); i++ { for i := 0; i < int(config.buckets); i++ {
c.buckets[i] = &layeredBucket{ c.buckets[i] = &layeredBucket{
@@ -163,19 +160,26 @@ func (c *LayeredCache) Clear() {
func (c *LayeredCache) Stop() { func (c *LayeredCache) Stop() {
close(c.promotables) close(c.promotables)
<-c.donec <-c.control
} }
// Gets the number of items removed from the cache due to memory pressure since // Gets the number of items removed from the cache due to memory pressure since
// the last time GetDropped was called // the last time GetDropped was called
func (c *LayeredCache) GetDropped() int { func (c *LayeredCache) GetDropped() int {
c.getDroppedReq <- struct{}{} res := make(chan int)
return <-c.getDroppedRes c.control <- getDropped{res: res}
return <-res
}
// Sets a new max size. That can result in a GC being run if the new maxium size
// is smaller than the cached size
func (c *LayeredCache) SetMaxSize(size int64) {
c.control <- setMaxSize{size}
} }
func (c *LayeredCache) restart() { func (c *LayeredCache) restart() {
c.promotables = make(chan *Item, c.promoteBuffer) c.promotables = make(chan *Item, c.promoteBuffer)
c.donec = make(chan struct{}) c.control = make(chan interface{})
go c.worker() go c.worker()
} }
@@ -199,7 +203,7 @@ func (c *LayeredCache) promote(item *Item) {
} }
func (c *LayeredCache) worker() { func (c *LayeredCache) worker() {
defer close(c.donec) defer close(c.control)
dropped := 0 dropped := 0
for { for {
select { select {
@@ -220,9 +224,17 @@ func (c *LayeredCache) worker() {
} }
c.list.Remove(item.element) c.list.Remove(item.element)
} }
case _ = <-c.getDroppedReq: case control := <-c.control:
c.getDroppedRes <- dropped switch msg := control.(type) {
dropped = 0 case getDropped:
msg.res <- dropped
dropped = 0
case setMaxSize:
c.maxSize = msg.size
if c.size > c.maxSize {
dropped += c.gc()
}
}
} }
} }
} }

View File

@@ -174,6 +174,38 @@ func (_ LayeredCacheTests) RemovesOldestItemWhenFull() {
Expect(cache.GetDropped()).To.Equal(0) Expect(cache.GetDropped()).To.Equal(0)
} }
func (_ LayeredCacheTests) ResizeOnTheFly() {
cache := Layered(Configure().MaxSize(9).ItemsToPrune(1))
for i := 0; i < 5; i++ {
cache.Set(strconv.Itoa(i), "a", i, time.Minute)
}
cache.SetMaxSize(3)
time.Sleep(time.Millisecond * 10)
Expect(cache.GetDropped()).To.Equal(2)
Expect(cache.Get("0", "a")).To.Equal(nil)
Expect(cache.Get("1", "a")).To.Equal(nil)
Expect(cache.Get("2", "a").Value()).To.Equal(2)
Expect(cache.Get("3", "a").Value()).To.Equal(3)
Expect(cache.Get("4", "a").Value()).To.Equal(4)
cache.Set("5", "a", 5, time.Minute)
time.Sleep(time.Millisecond * 5)
Expect(cache.GetDropped()).To.Equal(1)
Expect(cache.Get("2", "a")).To.Equal(nil)
Expect(cache.Get("3", "a").Value()).To.Equal(3)
Expect(cache.Get("4", "a").Value()).To.Equal(4)
Expect(cache.Get("5", "a").Value()).To.Equal(5)
cache.SetMaxSize(10)
cache.Set("6", "a", 6, time.Minute)
time.Sleep(time.Millisecond * 10)
Expect(cache.GetDropped()).To.Equal(0)
Expect(cache.Get("3", "a").Value()).To.Equal(3)
Expect(cache.Get("4", "a").Value()).To.Equal(4)
Expect(cache.Get("5", "a").Value()).To.Equal(5)
Expect(cache.Get("6", "a").Value()).To.Equal(6)
}
func newLayered() *LayeredCache { func newLayered() *LayeredCache {
return Layered(Configure()) return Layered(Configure())
} }