mirror of
https://github.com/datarhei/core.git
synced 2025-10-08 01:10:20 +08:00
Add cache for file to node relation in cluster proxy
This commit is contained in:
97
cluster/proxy/cache.go
Normal file
97
cluster/proxy/cache.go
Normal file
@@ -0,0 +1,97 @@
|
|||||||
|
package proxy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type TimeSource interface {
|
||||||
|
Now() time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
type StdTimeSource struct{}
|
||||||
|
|
||||||
|
func (s *StdTimeSource) Now() time.Time {
|
||||||
|
return time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
type CacheEntry[T any] struct {
|
||||||
|
value T
|
||||||
|
validUntil time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
type Cache[T any] struct {
|
||||||
|
ts TimeSource
|
||||||
|
lock sync.Mutex
|
||||||
|
entries map[string]CacheEntry[T]
|
||||||
|
lastPurge time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewCache[T any](ts TimeSource) *Cache[T] {
|
||||||
|
c := &Cache[T]{
|
||||||
|
ts: ts,
|
||||||
|
entries: map[string]CacheEntry[T]{},
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.ts == nil {
|
||||||
|
c.ts = &StdTimeSource{}
|
||||||
|
}
|
||||||
|
|
||||||
|
c.lastPurge = c.ts.Now()
|
||||||
|
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Cache[T]) Get(key string) (T, error) {
|
||||||
|
c.lock.Lock()
|
||||||
|
defer c.lock.Unlock()
|
||||||
|
|
||||||
|
e, ok := c.entries[key]
|
||||||
|
if !ok {
|
||||||
|
var noop T
|
||||||
|
return noop, errors.New("not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.ts.Now().After(e.validUntil) {
|
||||||
|
delete(c.entries, key)
|
||||||
|
var noop T
|
||||||
|
return noop, errors.New("not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
return e.value, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Cache[T]) Put(key string, value T, ttl time.Duration) {
|
||||||
|
c.lock.Lock()
|
||||||
|
defer c.lock.Unlock()
|
||||||
|
|
||||||
|
now := c.ts.Now()
|
||||||
|
|
||||||
|
if now.Sub(c.lastPurge) > time.Minute {
|
||||||
|
c.purge(now)
|
||||||
|
c.lastPurge = now
|
||||||
|
}
|
||||||
|
|
||||||
|
e := c.entries[key]
|
||||||
|
|
||||||
|
e.value = value
|
||||||
|
e.validUntil = now.Add(ttl)
|
||||||
|
|
||||||
|
c.entries[key] = e
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Cache[T]) Delete(key string) {
|
||||||
|
c.lock.Lock()
|
||||||
|
defer c.lock.Unlock()
|
||||||
|
|
||||||
|
delete(c.entries, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Cache[T]) purge(now time.Time) {
|
||||||
|
for key, e := range c.entries {
|
||||||
|
if now.After(e.validUntil) {
|
||||||
|
delete(c.entries, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
72
cluster/proxy/cache_test.go
Normal file
72
cluster/proxy/cache_test.go
Normal file
@@ -0,0 +1,72 @@
|
|||||||
|
package proxy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
type testTimeSource struct {
|
||||||
|
now time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testTimeSource) Now() time.Time {
|
||||||
|
return t.now
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCache(t *testing.T) {
|
||||||
|
ts := &testTimeSource{
|
||||||
|
now: time.Unix(0, 0),
|
||||||
|
}
|
||||||
|
|
||||||
|
c := NewCache[string](ts)
|
||||||
|
|
||||||
|
_, err := c.Get("foo")
|
||||||
|
require.Error(t, err)
|
||||||
|
|
||||||
|
c.Put("foo", "bar", 10*time.Second)
|
||||||
|
|
||||||
|
v, err := c.Get("foo")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, "bar", v)
|
||||||
|
|
||||||
|
ts.now = time.Unix(10, 0)
|
||||||
|
|
||||||
|
v, err = c.Get("foo")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, "bar", v)
|
||||||
|
|
||||||
|
ts.now = time.Unix(11, 0)
|
||||||
|
|
||||||
|
_, err = c.Get("foo")
|
||||||
|
require.Error(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCachePurge(t *testing.T) {
|
||||||
|
ts := &testTimeSource{
|
||||||
|
now: time.Unix(0, 0),
|
||||||
|
}
|
||||||
|
|
||||||
|
c := NewCache[string](ts)
|
||||||
|
|
||||||
|
c.Put("foo", "bar", 10*time.Second)
|
||||||
|
|
||||||
|
v, err := c.Get("foo")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, "bar", v)
|
||||||
|
|
||||||
|
ts.now = time.Unix(59, 0)
|
||||||
|
|
||||||
|
c.Put("foz", "boz", 10*time.Second)
|
||||||
|
|
||||||
|
_, ok := c.entries["foo"]
|
||||||
|
require.True(t, ok)
|
||||||
|
|
||||||
|
ts.now = time.Unix(61, 0)
|
||||||
|
|
||||||
|
c.Put("foz", "boz", 10*time.Second)
|
||||||
|
|
||||||
|
_, ok = c.entries["foo"]
|
||||||
|
require.False(t, ok)
|
||||||
|
}
|
@@ -59,9 +59,10 @@ type proxy struct {
|
|||||||
nodesLock sync.RWMutex
|
nodesLock sync.RWMutex
|
||||||
|
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
|
|
||||||
running bool
|
running bool
|
||||||
|
|
||||||
|
cache *Cache[string]
|
||||||
|
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -71,6 +72,7 @@ func NewProxy(config ProxyConfig) (Proxy, error) {
|
|||||||
p := &proxy{
|
p := &proxy{
|
||||||
id: config.ID,
|
id: config.ID,
|
||||||
nodes: map[string]Node{},
|
nodes: map[string]Node{},
|
||||||
|
cache: NewCache[string](nil),
|
||||||
logger: config.Logger,
|
logger: config.Logger,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -323,11 +325,21 @@ func (p *proxy) getNodeIDForFile(prefix, path string) (string, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *proxy) getNodeForFile(prefix, path string) (Node, error) {
|
func (p *proxy) getNodeForFile(prefix, path string) (Node, error) {
|
||||||
id, err := p.getNodeIDForFile(prefix, path)
|
id, err := p.cache.Get(prefix + ":" + path)
|
||||||
|
if err == nil {
|
||||||
|
node, err := p.GetNode(id)
|
||||||
|
if err == nil {
|
||||||
|
return node, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
id, err = p.getNodeIDForFile(prefix, path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
p.cache.Put(prefix+":"+path, id, 5*time.Second)
|
||||||
|
|
||||||
return p.GetNode(id)
|
return p.GetNode(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user