diff --git a/cluster/proxy/cache.go b/cluster/proxy/cache.go new file mode 100644 index 00000000..0e7edbb4 --- /dev/null +++ b/cluster/proxy/cache.go @@ -0,0 +1,97 @@ +package proxy + +import ( + "errors" + "sync" + "time" +) + +type TimeSource interface { + Now() time.Time +} + +type StdTimeSource struct{} + +func (s *StdTimeSource) Now() time.Time { + return time.Now() +} + +type CacheEntry[T any] struct { + value T + validUntil time.Time +} + +type Cache[T any] struct { + ts TimeSource + lock sync.Mutex + entries map[string]CacheEntry[T] + lastPurge time.Time +} + +func NewCache[T any](ts TimeSource) *Cache[T] { + c := &Cache[T]{ + ts: ts, + entries: map[string]CacheEntry[T]{}, + } + + if c.ts == nil { + c.ts = &StdTimeSource{} + } + + c.lastPurge = c.ts.Now() + + return c +} + +func (c *Cache[T]) Get(key string) (T, error) { + c.lock.Lock() + defer c.lock.Unlock() + + e, ok := c.entries[key] + if !ok { + var noop T + return noop, errors.New("not found") + } + + if c.ts.Now().After(e.validUntil) { + delete(c.entries, key) + var noop T + return noop, errors.New("not found") + } + + return e.value, nil +} + +func (c *Cache[T]) Put(key string, value T, ttl time.Duration) { + c.lock.Lock() + defer c.lock.Unlock() + + now := c.ts.Now() + + if now.Sub(c.lastPurge) > time.Minute { + c.purge(now) + c.lastPurge = now + } + + e := c.entries[key] + + e.value = value + e.validUntil = now.Add(ttl) + + c.entries[key] = e +} + +func (c *Cache[T]) Delete(key string) { + c.lock.Lock() + defer c.lock.Unlock() + + delete(c.entries, key) +} + +func (c *Cache[T]) purge(now time.Time) { + for key, e := range c.entries { + if now.After(e.validUntil) { + delete(c.entries, key) + } + } +} diff --git a/cluster/proxy/cache_test.go b/cluster/proxy/cache_test.go new file mode 100644 index 00000000..5827cd48 --- /dev/null +++ b/cluster/proxy/cache_test.go @@ -0,0 +1,72 @@ +package proxy + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +type testTimeSource struct { + now time.Time +} + +func (t *testTimeSource) Now() time.Time { + return t.now +} + +func TestCache(t *testing.T) { + ts := &testTimeSource{ + now: time.Unix(0, 0), + } + + c := NewCache[string](ts) + + _, err := c.Get("foo") + require.Error(t, err) + + c.Put("foo", "bar", 10*time.Second) + + v, err := c.Get("foo") + require.NoError(t, err) + require.Equal(t, "bar", v) + + ts.now = time.Unix(10, 0) + + v, err = c.Get("foo") + require.NoError(t, err) + require.Equal(t, "bar", v) + + ts.now = time.Unix(11, 0) + + _, err = c.Get("foo") + require.Error(t, err) +} + +func TestCachePurge(t *testing.T) { + ts := &testTimeSource{ + now: time.Unix(0, 0), + } + + c := NewCache[string](ts) + + c.Put("foo", "bar", 10*time.Second) + + v, err := c.Get("foo") + require.NoError(t, err) + require.Equal(t, "bar", v) + + ts.now = time.Unix(59, 0) + + c.Put("foz", "boz", 10*time.Second) + + _, ok := c.entries["foo"] + require.True(t, ok) + + ts.now = time.Unix(61, 0) + + c.Put("foz", "boz", 10*time.Second) + + _, ok = c.entries["foo"] + require.False(t, ok) +} diff --git a/cluster/proxy/proxy.go b/cluster/proxy/proxy.go index 3d1ea2df..d593676c 100644 --- a/cluster/proxy/proxy.go +++ b/cluster/proxy/proxy.go @@ -58,10 +58,11 @@ type proxy struct { nodes map[string]Node // List of known nodes nodesLock sync.RWMutex - lock sync.RWMutex - + lock sync.RWMutex running bool + cache *Cache[string] + logger log.Logger } @@ -71,6 +72,7 @@ func NewProxy(config ProxyConfig) (Proxy, error) { p := &proxy{ id: config.ID, nodes: map[string]Node{}, + cache: NewCache[string](nil), logger: config.Logger, } @@ -323,11 +325,21 @@ func (p *proxy) getNodeIDForFile(prefix, path string) (string, error) { } func (p *proxy) getNodeForFile(prefix, path string) (Node, error) { - id, err := p.getNodeIDForFile(prefix, path) + id, err := p.cache.Get(prefix + ":" + path) + if err == nil { + node, err := p.GetNode(id) + if err == nil { + return node, nil + } + } + + id, err = p.getNodeIDForFile(prefix, path) if err != nil { return nil, err } + p.cache.Put(prefix+":"+path, id, 5*time.Second) + return p.GetNode(id) }